이 기사는 데이터 과학 Blogathon
개요
Apache Spark는 빅 데이터와 관련된 모든 종류의 프로젝트에서 가장 인기 있고 자주 접하게 되는 빅 데이터 처리 프레임워크입니다. 그것은 작업의 속도와 자신의 생각을 표현하는 개발자의 단순성을 성공적으로 결합합니다.
개발자는 충분히 높은 수준의 데이터로 작업하며 예를 들어 코드 한 줄만 작성하여 두 데이터 세트를 연결하는 데 어려움이 없는 것 같습니다.
orderDF.join(customersDF, ordersDF ["customer_id"] == customersDF["id"], "left_outer")
하지만 생각해 보세요. 두 개의 데이터 세트가 연결되면 클러스터에서 어떤 일이 발생합니까? 이 데이터 세트는 완전히 클러스터 노드에 있을 수도 있고 아닐 수도 있습니다. 일반적으로 Apache Spark는 모든 것을 빠르게 처리하지만, 특히 데이터가 정말 많은 경우에는 여전히 아래 수준에서 무슨 일이 일어나고 있는지 이해하고 이 지식을 사용하여 Apache Spark가 최대한 작동하도록 해야 합니다.
오늘 우리는 애플리케이션을 빠르게 실행하고 요청한 모든 리소스를 사용하는 방법에 대해 이야기할 것입니다. 이 기사에서는 정적 프로비저닝을 사용하여 Yarn 클러스터에서 Apache Spark 애플리케이션을 실행하는 Spark SQL 모듈에 주로 초점을 맞출 것입니다. 그러나 일반적인 아이디어는 다른 초기 데이터에도 적용될 수 있습니다. Spark 2.3의 모든 혁신을 더 잘 이해하기 위해 여기에서 Spark 2.4/3를 살펴보고 있습니다.
데이터와 데이터의 위치
Spark가 데이터 작업을 위해 제공하는 추상화부터 시작하겠습니다. 이것이 RDD( 탄력적인 분산 데이터 세트 ). 이 기사의 목적을 위해 DataFrame 또는 DataSet으로 작업하는 것은 중요하지 않습니다.
이미지 1
따라서 개발자의 경우 데이터 집합이 단일 개체로 표시되고 클러스터의 일부 실행기에서 일부 스레드에서 개별적으로 부분(블록)으로 처리됩니다. 블록은 처리의 최소 단위이며 실행자는 블록과 이 데이터 블록으로 수행해야 할 작업을 알려주는 명령을 받습니다.
클러스터에서 Apache Spark 앱이 작동하는 방식
높은 수준에서 모든 Spark 응용 프로그램은 작동 시 드라이버로 구성됩니다. 즉, 주() 기능을 실행하는 프로그램과 클러스터 노드에서 실행되는 실행 프로그램입니다. Executors는 보편적인 군인이며 데이터 덩어리(블록)와 명령을 받아 실행하고 다음 명령을 받기 위해 운전자에게 완료를 보고합니다. 각 실행기는 하나 이상의 처리 스레드를 실행할 수 있으며, 이 경우 각 스레드는 다른 스레드와 독립적으로 자체 데이터 블록을 처리합니다. 따라서 애플리케이션을 시작할 때 클러스터 관리자에서 5개의 코어(스레드)가 있는 4개의 실행기를 주문했다면 매 순간에 20 * 20 = XNUMX개의 스레드가 있고 기껏해야 XNUMX개의 데이터 블록을 동시에 처리할 수 있습니다.
따라서 각 작업은 다음과 같이 실행됩니다.
-
num_executors – 데이터 처리 스레드가 시작될 개별 JVM 프로세스의 수입니다(동일한 클러스터 노드 또는 다른 노드에 둘 다 위치할 수 있음). 프로세스는 응용 프로그램이 끝날 때까지 실행됩니다.
-
executor_cores 각 실행기에서 실행 중인 동시 스레드 수입니다. 하나의 스레드는 한 번에 하나의 데이터 블록을 처리합니다.
이미지 2
Apache Spark 애플리케이션 작동 방식
Spark History(Spark 애플리케이션의 실행 로그를 편리한 형식으로 표시하기 위한 웹 서버)에서 다음과 같이 보입니다.
여기에 각각 XNUMX개의 처리 스레드가 있는 XNUMX개의 실행기가 있습니다.
Shuffle Apache Spark 성능 최적화
그래서 우리는 이러한 데이터 블록을 병렬로 처리할 수 있는 N개의 데이터 블록과 P개의 스레드(작업자)가 있다는 것을 알아냈습니다.
그리고 이러한 블록이 애플리케이션이 끝날 때까지 지속된다면 모든 것이 괜찮을 것입니다. 예를 들어 두 테이블을 키로 조인(JOIN)하고 키로 그룹화(GROUP BY)합니다. 이 경우 잘 알려진 MapReduce 패턴은 모든 사람에게 적용됩니다. 키별로 전체 집합의 데이터가 새 데이터 블록으로 재배포되어 동일한 키를 가진 행이 하나의 블록에만 있게 됩니다. 이 프로세스를 Spark에서는 Shuffle이라고 합니다. 나는 왜 그것을 대문자로 했는가? 매우 복잡하고 비용이 많이 드는 프로세스이기 때문에 수행자의 메모리 소비, 클러스터 노드의 디스크 메모리 소비 및 클러스터 노드 간의 네트워크 교환이 증가합니다. 애벌레가 나비로 변하는 모습을 연상케 합니다. 모든 것이 분해되어 새로운 모습으로 재조립되며 에너지 집약적입니다.
작업을 단계로 나누기
Spark에서 하나의 Shuffle에서 다른 Shuffle로 블록을 처리하는 것을 Stage라고 합니다. 셔플하기 전에 모든 블록이 병렬로 처리되고 셔플링 후에 병렬로 처리되지만 이전 단계의 끝 부분에 있는 모든 블록이 이 프로세스를 통과할 때까지 새 단계가 시작되지 않습니다. 따라서 블록을 병렬로 처리할 때 스테이지 간의 경계는 대기 장소입니다. 또한 한 단계 내에서 한 블록에 대한 모든 작업(작업)은 하나의 스레드 내에서 순차적으로 발생합니다. 즉, 블록은 네트워크 어디에서도 전송되지 않고 모든 블록이 병렬로 처리됩니다. 스테이지 경계 내 블록의 수는 변경되지 않은 것으로 나타났습니다.
이미지 3
작업은 단계로 나뉩니다
우리는 다음과 같은 그림에 도달했습니다. 모든 작업은 단계로 나뉘며 각 단계 내에서 블록의 수는 일정하고 동일합니다. 그리고 여기서부터 재미가 시작됩니다. 우리는 작업자의 수(P = 실행자 * 코어)를 알고 있지만 각 단계에 얼마나 많은 블록이 있을 것인지는 애플리케이션의 성능에 직접적인 영향을 미치는 질문입니다. 결국, 블록이 많고 수행자가 적은 경우 각 수행자는 여러 블록을 순차적으로 처리하고 그 반대도 마찬가지입니다. 블록이 적고 수행자가 많으면 일부 수행자는 유휴 상태이고 나머지는 유휴 상태가 됩니다. 쉬지 않고 일하고 있습니다. 여기서 가장 흥미로운 점은 응용 프로그램이 느리게 실행될 때 더 많은 실행기를 제공하려고 하지만 이 경우 성능이 증가하지 않는다는 것입니다.
작업량을 단계별로 파악하는 것부터 시작하겠습니다. 이하에서는 단순화를 위해 하나의 데이터 집합의 블록만 고려합니다. 주어진 시간에 수행자는 관련 없는 여러 단계를 처리할 수 있습니다. 예를 들어, JOIN 전에 두 데이터 세트는 서로 독립적으로 처리되므로 실행자를 서로 나눕니다. 이 경우 처리 장치의 수가 합이 됩니다. 그러나 우리의 목적을 위해서는 한 세트의 데이터에서 무슨 일이 일어나고 있는지 이해하는 것이 필요합니다. 첫 번째 단계에서 모든 것은 데이터 세트의 출처에 따라 다릅니다. 예를 들어 HDFS에서 parquet 파일의 디렉토리를 읽는 경우 첫 번째 단계의 블록 수는 일반적으로 (로드되는 디렉토리에서 모든 .parquet 파일을 구성하는 HDFS 블록 수)와 같습니다. 즉, 이 경우 각 HDFS 블록은 처리를 위한 별도의 데이터 블록을 나타냅니다. 하지만 이 블록 분배는 스테이지가 끝날 때까지 유지된다는 점을 잊지 마세요. 여기에 좋은 예가 있습니다.
150,000개 항목이 있는 HDFS에서 작은 파일을 읽고 있습니다. 전체 파일은 하나의 HDFS 블록에 맞습니다. 따라서 첫 번째 단계에서는 데이터 블록이 하나만 있으므로 한 명의 수행자만 이 블록으로 작업할 수 있습니다. 그러나 변환의 논리에 따르면 각 줄에는 필드 기간(시청 초 수)이 포함되어 있으며 출력의 각 줄에 이 줄에서 시청한 초 수만큼의 줄을 곱해야 합니다.
viDF = spark.read.parquet("/tst/vi/") viDF.createOrReplaceTempView("ViewingInterval") spark.sql("""선택 t.*, explode(get_list_of_seconds(duatation)) as secondNumber from ViewingInterval""" )
테스트 데이터의 변환이 빠르게 작동하지 않습니다. Spark History를 살펴보면 다음과 같습니다.
첫 번째 단계에서 하나의 데이터 블록
작업 = 1은 데이터 블록이 하나뿐이므로 이 단계에서 작업이 하나만 있음을 의미합니다. 입력에서 2MB의 데이터가 표시되고 출력에서 이미 확장된 1GB 데이터 세트가 있습니다. 그리고 이 모든 작업은 하나의 스레드에서 수행되며 이 단계에서는 더 이상 작업이 없기 때문에 나머지는 유휴 상태입니다. 결국 우리가해야 할 일은 좁은 의존성이며 이러한 이유로 단계를 중단하지 않고 데이터를 읽는 동일한 단계에서 수행됩니다. 우리가 이미 알고 있듯이 스테이지의 프레임워크 내에서 블록의 수는 변경되지 않습니다. 이 경우, 우리는 (입력 데이터 세트가 작고 셔플링이 빠르게 일어나기 때문에) 함수 repartition(N)을 사용하여 이 단계를 둘로 쉽게 나눌 수 있습니다. 출력에서 크기가 거의 같습니다. 그리고 셔플(Shuffle)하므로 새로운 스테이지가 시작된다는 의미입니다.
viDF = spark.read.parquet("/tst/vi/") viDF.repartition(60).createOrReplaceTempView("ViewingInterval") spark.sql("""t.* 선택, 폭발(get_list_of_seconds(duatation)) as secondNumber ViewingInterval에서""")
Spark 기록을 살펴보겠습니다.
이제 처리가 병렬로 실행됩니다.
두 번째 단계 - 이제 재분할 후 폭발적으로 증가했습니다. 우리는 60개의 작업(데이터 블록)을 가지고 있으며 모든 수행자는 이제 작동 중이며 유휴 상태가 아닙니다. 변신 시간이 거의 절반으로 단축되었습니다. 우리의 임무는 가동 중지 시간이 없고 모든 수행자가 작동하는지 확인하는 것입니다. 그렇지 않으면 나중에 사용하지 않는 클러스터에서 리소스를 가져오는 이유가 무엇입니까?
우리는 첫 번째 단계를 알아냈고 repartition(N)의 도움으로 모든 단계를 둘로 나누는 방법도 배웠습니다. 두 셔플 사이에 있는 내부 단계를 처리해 보겠습니다. 여기에서 모든 것은 spark spark.sql.shuffle.partitions(기본값 200) 매개변수에 의해 결정됩니다. 더 정확하게는 AQE Spark가 도입되면서 이 금액 자체를 규제하는 법을 배웠기 때문에 결정했습니다. 따라서 모든 내부 단계는 데이터 블록의 spark.sql.shuffle.partitions로 구성됩니다. 그러나 여기에서도 모든 것이 그렇게 순조로운 것은 아닙니다. 데이터가 많지 않으면 이 매개변수를 줄여야 하고 데이터가 많으면 늘려야 합니다. 그리고 Spark 2.3의 경우 데이터에 따라 일종의 중간 지점을 찾아야 합니다.
데이터가 거의 없고 기본적으로 spark.sql.shuffle.partitions = 200인 경우의 예를 들어보겠습니다. Spark History를 보면 데이터 세트가 185줄로만 구성되어 있고 셔플하는 동안 200블록으로 나누어져 있음을 알 수 있습니다. 여기서는 200블록으로 충분하지 않습니다. 수행자의 정말 유용한 작업은 여기에서 녹색으로 표시됩니다. 즉, 하나의 레코드에서 하나의 데이터 블록을 처리하는 수행자의 총 작업 시간 중 유용 시간이 <10%인 것으로 나타났습니다. 나머지 시간은 대기 중이며 직렬화 해제됩니다.
마지막 단계에서는 어떻게 됩니까? 이것은 다시 출력 위치에 따라 다릅니다. 변환 데이터. 예를 들어, 모든 것을 쪽모이 세공 파일로 디렉토리에 쓰고 싶습니다. 아무 것도 하지 않고 셔플 후에 이 작업을 수행하면 프로그램 실행 후 이 디렉토리에서 200개의 파일을 찾을 수 있습니다. 왜요? 셔플 후 기본적으로 spark.sql.shuffle.partitions = 200 블록을 얻었고 하나의 블록이 하나의 스레드에서 처리되기 때문에 별도의 파일에 자체적으로 씁니다.
일반적으로 개발자는 여기에서 HDFS의 파일 수를 제어하고 DataFrame coalesce(N)에 저장할 때 메서드를 호출합니다. 이 방법은 단순히 우리 세트의 각 블록을 N의 새 블록 중 하나로 입력합니다. 그것은 합쳐져(), 실제로는 repartition()과 달리 셔플링으로 이어지지 않으므로 단계를 중단하지 않고 우리 단계에서 N개의 데이터 블록이 있도록 만듭니다. 그러나 이것이 이 단계에서 N 명의 공연자만을 위한 작업이 될 것이라는 점입니다. 모든 것을 하나의 파일에 저장하기로 결정했다면 - 하나의 스트림만 작동합니다. 첫 번째 단계에 대한 추론을 상기해 보겠습니다. 마지막 단계가 계산 측면에서 매우 심각한 경우 저장 직전에 병합(N) 마지막 단계를 두 개로 분할하기 위해 재분할(N)을 수행하는 것이 합리적입니다. 두 번째 하나는 spark.sql.shuffle.partitions에서 병렬로 스레드의 무거운 계산을 수행하고(예: 이전에 조인이 있었던 경우) 마지막 하나는 필요한 파일 수(N ) 자원 집약적인 계산 없이 이미. 여기서 무엇이 더 빠를지 생각해야 합니다. 모든 것을 그대로 두거나 repartition(N)을 추가하여 셔플을 수행합니다. 이 또한 무료는 아니지만 잠재적으로 복잡한 계산을 병렬화할 수 있습니다.
dataDF.repartition(1) .write .format("parquet") .mode("덮어쓰기") .option("압축","snappy") .save("/tst/problem_4/result")
무대 위의 블록 수와 출연자 수 사이의 관계를 파악했으므로 이제 작은 예를 들어 보겠습니다. 입력 단계에는 20개의 데이터 블록이 있고 10개의 실행자(5개의 실행자 * 2개의 코어)만 있습니다. 평균적으로 하나의 실행기에는 처리해야 하는 두 개의 데이터 블록이 있기 때문에 거의 모든 실행기가 한 블록을 처리한 후 처리를 위해 다른 블록을 만들어야 한다는 것을 알 수 있습니다. 그러나 한 단계의 모든 데이터 블록이 병렬로 처리될 수 있다는 점을 기억하고 작업을 위해 20개의 실행기를 요청합니다(5개의 실행기 * 4개의 코어). 각 실행기는 이제 하나의 블록만 처리하고 전체 단계의 시간은 이상적으로는 반으로 줄이십시오. 리소스 증가가 효과가 있고 속도가 증가하는 경우가 바로 이 경우입니다.
리소스 증가 – 더 빠르게 작동
그건 그렇고, 이전 단락에서 설명한대로 유지하면서 마지막 단계를 깨는 방법을 적용하는 흥미로운 점 중 하나는 다음과 같습니다.
dataDF.repartition(N).write. …
마지막 단계가 중단되기 전과 후의 지표를 비교하면 모든 것이 괜찮아 보입니다. 변환 시간이 여러 번 감소했습니다(마지막 계산이 모든 수행자에 의해 병렬로 수행되었기 때문에), Shuffle Spill이 사라졌습니다(이때 수행자가 메모리가 부족하고 로컬과 일종의 스왑을 주선합니다. 물론 이 경우 모든 데이터가 여러 개의 큰 블록으로 들어왔고 수행자는 이를 소화하는 데 어려움을 겪었습니다.
멈추다! 저장할 때 받은 파일의 크기를 자세히 살펴보겠습니다. 5.9GB에서 지금은 10.3GB, 레코드 수는 동일하고 데이터 구성은 동일합니다. 왜요? 그것은 연고에 파리입니다!
출력 크기에 주의
추가만 했습니다. 우리는 이미 repartition()이 데이터를 무작위로 배포한다는 것을 알아냈습니다. 즉, 마지막 셔플 이후에 키별로 부분적으로 정렬된 데이터 대신(우리의 경우 JOIN이 사용됨) 데이터를 무작위로 배포합니다. 쪽모이 세공 마루는 열 형식의 파일 저장 형식이며 그 안의 데이터는 열에서 부분적으로 정렬될 수 있다는 사실을 이용하여 압축됩니다. 행 분포에 임의성을 도입하여 데이터 압축성을 거의 두 배나 저하시킨 것으로 나타났습니다. 당신은 그것에 대해 무엇을 할 수 있습니까? 각 데이터 블록 내부에서 주문을 반환하는 것이 가능합니다.
dataDF.repartition(20). sortWithinPartitions(asc("id")).write. …
sortWithinPartitions() 함수는 각 블록 내의 필드 또는 여러 필드를 기준으로 정렬합니다. 여러 필드를 기준으로 정렬하기 위해 이 함수를 변환에 적용한 후 출력 파일의 전체 크기는 처음보다 약간 작아졌습니다. 이제 모든 것이 신속하게 작동하고 출력 파일의 크기가 적합합니다. 또한 이 경우 HDFS에 거의 동일한 크기의 파일을 기록했습니다(이는 결과적으로 repartition()). 이는 추가 처리에 편리할 수 있습니다.
Apache Spark 성능 최적화를 위한 옵티마이저
쪽모이 세공 형식 파일을 다루었으므로 Spark 최적화 프로그램이 술어 푸시다운 및 프로젝션 푸시다운과 같은 최적화 프로그램 규칙의 예에서 어떻게 작동하는지 볼 것입니다.
프로젝션 푸시다운의 경우 특히 기둥형 마루가 유리합니다. 쿼리 트리의 실제 실행은 액션이 수행될 때, 즉 데이터를 출력하는 연산이 수행될 때만 시작된다는 점을 상기시켜 드리겠습니다. 데이터베이스에서 파일 전송 등 ... 그렇게 함으로써 Spark는 쿼리 트리를 구축하고 최적화합니다. 따라서 쿼리를 작성할 때 옵티마이저는 결과를 얻는 데 필요한 필드를 이미 알고 있으며 파일에서 이러한 필드만 읽습니다. 열 형식 파일 형식에서는 데이터가 열 컨텍스트에 저장되므로 이러한 필드만 파일에서 읽습니다.
옵티마이저의 술어 푸시다운 규칙을 고려하십시오. 이 최적화의 원리는 매우 간단합니다. 우리는 많은 양의 데이터를 가지고 있으며 결국 유용하지 않으면 처리할 필요가 없습니다. 예를 들어 쿼리 트리 실행이 끝날 때 필터링해야 합니다. 최적화 프로그램은 파일(또는 예를 들어 RDBMS에 대한 쿼리)을 직접 읽기 전에 이상적으로는 데이터 소스에 더 가까운 낮은 수준으로 모든 조건과 필터를 낮추려고 합니다.
예를 들어 보겠습니다.
생성된 쿼리에 대한 물리적 실행 계획은 다음과 같습니다.
파일에서 직접 읽기 블록(FileScan parquet)과 PushedFilters 블록에 주의를 기울이겠습니다. 이는 파일을 물리적으로 읽는 동안 부과되는 조건입니다. 여기에 세 가지 조건이 있음을 알 수 있습니다.
-
ValueDatecondition IsNotNulland LessThanOrEqual-의 경우 후자의 경우 명확하며 이는 SQL에 반영됩니다. IsNotNull에서 어디에서 왔습니까? 요청 ValueDate <= 상수에 조건이 있고 NULL 값이 이 조건을 충족하지 않는 것이 분명합니다. 즉, 논리적으로 모든 것이 정확합니다. 그런데 왜 쪽모이 세공 마루 최적화 프로그램이 이 조건을 별도로 만들까요? 다음 단락에서 이에 대해 자세히 설명합니다.
-
SubjectID 조건 IsNotNull의 경우. 그러나 우리는 요청에 그러한 조건이 없으며 일반적으로 SubjectID에 대한 조건이 없습니다. 이 필드에는 테이블이 기본 테이블에 조인되는 LEFT JOIN만 있습니다. 예, 정확합니다. 이러한 JOIN을 사용하면 SubjectID가 NULL인 모든 행이 결과 선택 항목에 포함되지 않습니다. 옵티마이저가 이를 고려하고 처음에는 파일에서 이러한 행을 읽지 않는다는 것을 알 수 있습니다.
옵티마이저가 별도로 추가하는 IsNotNull 조건에 대해 여전히 흥미로운 점을 알아보겠습니다. 이를 위해 parquet 파일의 구조를 살펴보겠습니다. 당신이 사용할 수있는 이를 위한 쪽모이 세공 도구. 문제는 parquet 파일이 스키마와 함께 행 그룹 컨텍스트의 필드에 대한 일부 통계도 저장한다는 것입니다.
내부 쪽모이 세공 파일
모든 정수 유형에 대해 값의 수( Values), 이 블록의 NULL 수( Null Values), 이 행 그룹에 있는 열의 최소 및 최대 값이 있음을 알 수 있습니다. IsNotNull 필드에 대한 조건을 즉시 기억합니다. 즉, 이 그룹의 SubjectID 필드에 Values = Null Values가 있으면 이 행 그룹의 모든 값이 NULL이고 이 블록을 전혀 읽지 않는다는 결론을 내릴 수 있습니다. 더 많이, 더 적게, 같음 조건에도 동일하게 적용됩니다. 여기에서 열의 최소 및 최대 값을 사용하고 이 행 그룹을 읽어야 하는지 여부에 관계없이 결론을 도출할 수 있습니다.
쿼리 실행을 시작하기 전에 미리 알고 있는 경우에만 조건을 아래 수준으로 낮출 수 있다는 점을 이해하는 것이 중요합니다.
Apache Spark 성능 최적화의 실제 예
쪽모이 세공 파일 디렉토리는 필드별로 분할됩니다. 이 필드에 대한 필터 값이 있는 문자열이 쉼표로 구분되어 변환에 전달되었습니다. 개발자는 폭발(split(filter)), 즉 값이 있는 이 행의 작은 테이블과 깨끗한 양심으로 필터링해야 하는 기본 테이블과 INNER JOIN을 만들었습니다. 변환은 느리게 작동했습니다. 쿼리 계획을 살펴보겠습니다.
모든 파티션은 HDFS에서 읽습니다.
이상하지만 첫 번째 단계에서 Spark는 모든 파티션을 뺍니다( PartitionCount = 121). 단 하나의 값으로 구성된 필터를 전달하더라도. 이것은 쿼리 트리를 작성할 때 Spark가 필터에 대해 전혀 알지 못하는 경우입니다. JOIN 뒤에 숨겨져 있기 때문입니다.
필터 값으로 테이블을 작성하는 대신 표준 Spark SQL 함수를 사용합니다. find_in_set(). 쉼표로 구분된 목록인 문자열에서 하위 문자열의 위치를 찾습니다.
즉, 필터는 이제 다음과 같은 간단한 표현식을 나타냅니다. 여기서 find_in_set(surveyprogectid, )
그리고 쿼리 실행 계획을 보면 옵티마이저는 빌드할 때 이미 필터 문자열과 조건을 알고 있기 때문에 이 조건을 파일에서 읽는 수준으로 낮춥니다. 또한 이것이 파티셔닝 필드임을 알고 파티션 프루닝 규칙을 적용합니다. 즉, 필터와 일치하지 않는 파티션을 고려 대상에서 제외합니다.
HDFS에서 하나의 파티션만 읽습니다.
우리의 조건은 이제 PartitionFilters 블록에 있습니다. 필드가 분할 중이기 때문에 HDFS( PartitionCount = 1)에서 필요한 파티션만 뺍니다.
따라서 분할이 있는 큰 테이블이 있고 JOIN을 통해 일부 파티션을 선택하는 경우 이 필터링된 테이블에서 값 목록을 문자열로 구성하고 상수로 전달하는 별도의 작업을 구성하는 것이 더 나을 수 있습니다. 메인 쿼리의 조건.
Apache Spark Optimizer의 문제점
옵티마이저의 훌륭한 작업 ... 그러나 때로는 가능한 한 조건을 소스로 가져오는 경향이 해로울 수 있습니다. UDF(사용자 정의 기능 ) 장면에 들어갑니다. 사용자 정의 함수는 Spark 옵티마이저의 블랙박스입니다.
다음 예제를 고려하십시오.
수십억 줄의 큰 파일이 있습니다. 고유한 ID만 선택하고 UDF를 적용한 다음 Null이 되는 결과만 선택하려고 합니다. 요청 순서:
T1=> 고르다 고유 ID 에 T T2=> 고르다 UDF(ID) as 새로운 아이디 에 T1 T3=> 고르다 * 에 T2 어디에 새로운 아이디 is null로
테이블에는 수천 개의 고유 id 값만 있으며 UDF는 빠르게 작동하지 않고 HBase로 이동합니다. 즉, 이러한 쿼리 트리를 구축한 우리는 UDF가 수천 번 호출될 것으로 예상합니다. 우리는 요청을 시작하고 오랫동안 기다립니다.
쿼리 실행 계획을 살펴보겠습니다.
UDF로 상태가 거의 최저 수준으로 떨어졌습니다.
… 오! 옵티마이저는 최선을 다했습니다. 파일을 직접 읽은 직후, 유일한 id만 선택하는 순간까지 조건 isNull(UDF(id))를 정직하게 낮췄습니다. 이것은 우리의 무거운 UDF가 수천 번이 아닌 수십억 번을 시도했음을 의미합니다.
여기서 무엇을 생각할 수 있습니까? 예를 들어 고유 ID(T1)를 계산한 후 캐시(지속)를 수행합니다. 또는 옵티마이저가 조건을 더 이상 통과하지 않는 측면 보기를 사용합니다.
고르다 udf_res as 새로운 아이디 에 T1 옆의 전망 폭발 (배열(UDF(ID))) as udf_res
처음에 원하는 것을 얻었습니다. UDF는 고유 ID에 대해서만 계산됩니다.
결론
이 기사의 범위를 벗어나는 JOIN 최적화와 관련된 질문이 있습니다. 브로드캐스트, 데이터 왜곡, 병합 및 재분할의 장단점이 있습니다. 여기에서는 일부 사항이 충분히 자세히 설명되어 있고 일부는 그렇지 않습니다. 그래서, 방문 애널리틱스Vidhya 이를 이용하여 고도로 최적화된 성능을 제공합니다. 같은 내용은 첨부된 스크린샷을 참조하세요!
참조 :
이미지 1: https://mallikarjuna_g.gitbooks.io/spark/content/diagrams/spark-rdds.png
Image 2: https://media.springernature.com/lw685/springer-static/image/art%3A10.1007%2Fs11227-019-03093-0/MediaObjects/11227_2019_3093_Fig1_HTML.png
이미지 3: https://platoaistream.net/wp-content/uploads/2021/09/apache-spark-performance-optimization-for-data-engineers-3.png
- "
- 000
- 9
- 계정
- 동작
- 이점
- All
- 중
- 분석
- 아파치
- 아파치 스파크
- 앱
- 어플리케이션
- 어플리케이션
- 기사
- 기초
- BEST
- 빅 데이터
- 억원
- 검정
- 보물상자
- 건물
- 전화
- 자세히
- 암호
- 단
- 소비
- 데이터
- 데이터 처리
- 데이터 세트
- 데이터베이스
- 거래
- 세부 묘사
- 개발자
- 개발자
- DID
- 중단 시간
- 운전사
- 엔지니어
- 들어갑니다
- 등
- 교환
- 실행
- FAST
- Fields
- 그림
- 필터
- 끝
- 먼저,
- 초점
- 형태
- 체재
- 뼈대
- 무료
- 장난
- 기능
- 일반
- 큰
- 초록색
- 그룹
- 여기에서 지금 확인해 보세요.
- 높은
- history
- 방법
- How To
- HTTPS
- 증가
- IT
- 일
- 어울리다
- 키
- 지식
- 넓은
- 시작
- 리드
- 배운
- 레벨
- 라인
- 명부
- 지방의
- 긴
- 경기
- 미디어
- 매질
- 가장 인기 많은
- 네트워크
- 노드
- 주문
- 기타
- 기타
- 무늬
- 지불
- 성능
- 물리적
- .
- 인기 문서
- 프로그램
- 프로젝트
- 읽기
- 현실
- 기록
- 신고
- 자료
- REST
- 결과
- 달리기
- 달리는
- 절약
- 과학
- 감각
- 세트
- 단순, 간단, 편리
- 크기
- 작은
- So
- 속도
- 분열
- SQL
- 단계
- 스타트
- 통계
- 저장
- 상점
- 말하다
- test
- 기초
- 블록
- 소스
- 시간
- 변환
- 보편적 인
- us
- 가치
- 관측
- 기다리다
- 웹
- 웹 서버
- 이내
- 작업
- 근로자
- 일
- 쓰기