Amazon EMR サーバーレスは、より大きなワーカー サイズをサポートして、より多くのコンピューティングとメモリ集約型のワークロードを実行します

Amazon EMR サーバーレスは、より大きなワーカー サイズをサポートして、より多くのコンピューティングとメモリ集約型のワークロードを実行します

ソースノード: 1960092

Amazon EMR サーバーレス クラスターやサーバーを管理することなく、Apache Spark や Apache Hive などのオープンソースのビッグ データ フレームワークを実行できます。 EMR サーバーレスを使用すると、変化するデータ量と処理要件に合わせて数秒でリソースのサイズを変更する自動スケーリングを使用して、あらゆる規模で分析ワークロードを実行できます。 EMR サーバーレスは、リソースを自動的にスケールアップおよびスケールダウンして、アプリケーションに適切な量の容量を提供します。

EMR サーバーレスは、最大 8 GB のメモリを備えた 60 個の vCPU と最大 16 GB のメモリを備えた 120 個の vCPU のワーカー構成を提供するようになりました。これにより、EMR サーバーレスでより多くのコンピューティングとメモリ集約型のワークロードを実行できるようになります。 EMR サーバーレス アプリケーションは、ワーカーを内部的に使用してワークロードを実行します。 また、ワークロードの要件に基づいてさまざまなワーカー構成を構成できます。 以前は、EMR サーバーレスで利用可能な最大のワーカー構成は、最大 4 GB のメモリを備えた 30 つの vCPU でした。 この機能は、次の一般的なシナリオで特に役立ちます。

  • シャッフル負荷の高いワークロード
  • メモリ集約型のワークロード

これらの各ユース ケースと、より大きなワーカー サイズを持つことの利点を見てみましょう。

シャッフルが集中するワークロードに大規模なワーカーを使用する利点

Spark と Hive では、計算中にデータをクラスター全体に再分散する必要がある場合にシャッフルが発生します。 アプリケーションが幅広い変換を実行したり、次のような操作を減らしたりする場合 join, groupBy, sortByまたは repartition、Spark および Hive がシャッフルをトリガーします。 また、すべての Spark ステージと Tez 頂点は、シャッフル操作によって制限されます。 Spark を例にとると、デフォルトでは、Spark ジョブごとに 200 のパーティションが定義されています。 spark.sql.shuffle.partitions. ただし、Spark は、データ サイズと実行中の操作に基づいて、オンザフライでタスクの数を計算します。 大規模なデータセットに対して幅広い変換が実行される場合、すべてのタスクで取得する必要がある GB 単位または TB 単位のデータが存在する可能性があります。

通常、シャッフルは時間とリソースの両方の面でコストがかかり、パフォーマンスのボトルネックにつながる可能性があります。 したがって、シャッフルを最適化すると、Spark ジョブのパフォーマンスとコストに大きな影響を与える可能性があります。 大規模なワーカーを使用すると、各エグゼキューターのメモリにより多くのデータを割り当てることができるため、エグゼキューター間でシャッフルされるデータが最小限に抑えられます。 これにより、同じワーカーからローカルにフェッチされるデータが増え、他のワーカーからリモートでフェッチされるデータが減るため、シャッフル読み取りのパフォーマンスが向上します。

実験

シャッフルを多用するクエリに大規模なワーカーを使用する利点を示すために、使用してみましょう q78 これは、167 の Spark ステージで 12 GB のデータをシャッフルする、シャッフルの多い Spark クエリです。 同じクエリを異なる構成で XNUMX 回繰り返してみましょう。

テスト 1 の構成は次のとおりです。

  • EMR サーバーレス アプリケーションの作成中に要求されたエグゼキュータのサイズ = 4 vCPU、8 GB メモリ、200 GB ディスク
  • Spark ジョブ構成:
    • spark.executor.cores = 4
    • spark.executor.memory = 8
    • spark.executor.instances = 48
    • 並列度 = 192 (spark.executor.instances * spark.executor.cores)

テスト 2 の構成は次のとおりです。

  • EMR サーバーレス アプリケーションの作成中に要求されたエグゼキュータのサイズ = 8 vCPU、16 GB メモリ、200 GB ディスク
  • Spark ジョブ構成:
    • spark.executor.cores = 8
    • spark.executor.memory = 16
    • spark.executor.instances = 24
    • 並列度 = 192 (spark.executor.instances * spark.executor.cores)

設定で動的割り当ても無効にしましょう spark.dynamicAllocation.enabled 〜へ false 両方のテストで、エグゼキューターの起動時間が可変であることによる潜在的なノイズを回避し、両方のテストでリソースの使用率を一定に保つことができます。 を使用しております 火花対策は、Spark パフォーマンス メトリックの収集と分析を簡素化するオープン ソース ツールです。 固定数のエグゼキュータを使用しているため、vCPU の総数と要求されるメモリは両方のテストで同じです。 次の表は、Spark Measure で収集されたメトリックからの観測結果をまとめたものです。

. クエリにかかった合計時間 (ミリ秒) シャッフルローカルブロックフェッチ shuffleRemoteBlocksFetched シャッフルローカルバイト読み取り shuffleRemoteBytesRead シャッフルフェッチ待機時間 シャッフル書き込み時間
テスト1 153244 114175 5291825 3.5 GB 163.1 GB 1.9時間 4.7分
テスト2 108136 225448 5185552 6.9 GB 159.7 GB 3.2分 5.2分

表からわかるように、シャッフルの改良により性能に大きな差が出ています。 テスト 2 は、テスト 1 の 29.44 倍のエグゼキューターの数を半分にして実行が 1.97% 速くなり、同じクエリ、同じ並列処理、同じ集計 vCPU およびメモリ リソースのテスト 1 と比較して、XNUMX 倍のシャッフル データがローカルにフェッチされました。 . したがって、大規模なエグゼキューターの助けを借りて、コストやジョブの並列処理を犠牲にすることなく、パフォーマンスを向上させることができます。 次のようなシャッフルを多用する他の TPC-DS クエリでも、同様のパフォーマンス上の利点が見られます。 q23a および q23b.

提言

大規模なワーカーがシャッフル集約型の Spark アプリケーションに役立つかどうかを判断するには、次の点を考慮してください。

  • チェック インターンシップ EMR サーバーレス アプリケーションの Spark History Server UI のタブ。 たとえば、次の Spark History Server のスクリーンショットから、この Spark ジョブが 167 ステージにわたって集約された 12 GB のシャッフル データを読み書きしたことを確認できます。 シャッフル読み取り および シャッフル書き込み 列。 ジョブが 50 GB を超えるデータをシャッフルする場合、8 または 16 vCPU または spark.executor.cores.

  • チェック SQL/データフレーム EMR サーバーレス アプリケーションの Spark History Server UI のタブ (Dataframe および Dataset API のみ)。 collect、take、showString、save など、実行される Spark アクションを選択すると、交換によって分離されたすべてのステージの集約された DAG が表示されます。 DAG 内のすべての交換はシャッフル操作に対応し、次のスクリーンショットに示すように、シャッフルされたローカルおよびリモートのバイトとブロックが含まれます。 フェッチされたローカル シャッフル ブロックまたはバイト数が、リモート ブロックまたはフェッチされたバイト数と比較してはるかに少ない場合は、より大きなワーカー (8 または 16 個の vCPU または spark.executor.core を使用) でアプリケーションを再実行し、これらの交換メトリクスを DAG で確認して、改善があるかどうかを確認します。

  • 火花対策 ツールを Spark クエリで使用して、Spark ドライバーのシャッフル メトリックを取得します。 stdout 次の Spark ジョブのログに示すように、ログ。 シャッフル読み取りにかかった時間を確認します (shuffleFetchWaitTime) およびシャッフル書き込み (shuffleWriteTime)、およびフェッチされたリモート バイトに対するフェッチされたローカル バイトの比率。 シャッフル操作に 2 分以上かかる場合は、より大きなワーカー (8 または 16 vCPU または spark.executor.cores) を Spark Measure と共に使用して、シャッフル パフォーマンスと全体的なジョブ実行時間の改善を追跡します。
Time taken: 177647 ms Scheduling mode = FIFO
Spark Context default degree of parallelism = 192 Aggregated Spark stage metrics:
numStages => 22
numTasks => 10156
elapsedTime => 159894 (2.7 min)
stageDuration => 456893 (7.6 min)
executorRunTime => 28418517 (7.9 h)
executorCpuTime => 20276736 (5.6 h)
executorDeserializeTime => 326486 (5.4 min)
executorDeserializeCpuTime => 124323 (2.1 min)
resultSerializationTime => 534 (0.5 s)
jvmGCTime => 648809 (11 min)
shuffleFetchWaitTime => 340880 (5.7 min)
shuffleWriteTime => 245918 (4.1 min)
resultSize => 23199434 (22.1 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1794288453176
recordsRead => 18696929278
bytesRead => 77354154397 (72.0 GB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 14124240761
shuffleTotalBlocksFetched => 5571316
shuffleLocalBlocksFetched => 117321
shuffleRemoteBlocksFetched => 5453995
shuffleTotalBytesRead => 158582120627 (147.7 GB)
shuffleLocalBytesRead => 3337930126 (3.1 GB)
shuffleRemoteBytesRead => 155244190501 (144.6 GB)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 156913371886 (146.1 GB)
shuffleRecordsWritten => 13867102620

メモリ集約型のワークロードに大規模なワーカーを使用する利点

特定の種類のワークロードはメモリを集中的に使用するため、ワーカーごとに構成されたメモリを増やすことでメリットが得られる場合があります。 このセクションでは、大規模なワーカーがメモリ集約型ワークロードの実行に役立つ一般的なシナリオについて説明します。

データの偏り

データの偏りは、通常、いくつかの種類のデータセットで発生します。 一般的な例として、不正行為の検出、人口分析、所得分配などがあります。 たとえば、データの異常を検出する場合、異常なデータは 1% 未満であると予想されます。 正常なレコードと異常なレコードに加えて集計を実行する場合、データの 99% が単一のワーカーによって処理されるため、そのワーカーのメモリが不足する可能性があります。 次のようなメモリ集約型の変換では、データ スキューが観察される場合があります。 groupBy, orderBy, join、ウィンドウ関数、 collect_list, collect_set、 等々。 などの結合タイプ BroadcastNestedLoopJoin およびデカルト積も、本質的にメモリ集約型であり、データ スキューの影響を受けやすくなります。 同様に、入力データが Gzip 圧縮されている場合、Gzip 圧縮タイプは分割できないため、XNUMX つの Gzip ファイルを複数のタスクで読み取ることはできません。 入力にいくつかの非常に大きな Gzip ファイルがある場合、XNUMX つのタスクが実行プログラムのメモリに収まらない巨大な Gzip ファイルを読み取らなければならない可能性があるため、ジョブがメモリ不足になる可能性があります。

データの偏りによる障害は、ソルティングなどの戦略を適用することで軽減できます。 ただし、これには多くの場合、コードへの大幅な変更が必要であり、受信データ量の突然の急増によって前例のないデータ スキューが原因で失敗した本番環境のワークロードには適していない可能性があります。 より簡単な回避策として、ワーカー メモリを増やしてください。 より大きなワーカーをより多く使用する spark.executor.memory アプリケーション コードを変更せずにデータ スキューを処理できます。

キャッシング

パフォーマンスを向上させるために、Spark ではデータ フレーム、データセット、および RDD をメモリにキャッシュできます。 これにより、再計算することなく、アプリケーションでデータ フレームを複数回再利用できます。 デフォルトでは、executor の JVM の最大 50% が、データ フレームのキャッシュに使用されます。 property spark.memory.storageFraction。 たとえば、 spark.executor.memory が 30 GB に設定されている場合、エビクションの影響を受けないキャッシュ ストレージに 15 GB が使用されます。

キャッシュ操作のデフォルトのストレージ レベルは DISK_AND_MEMORY. キャッシュしようとしているデータ フレームのサイズがエグゼキュータのメモリに収まらない場合、キャッシュの一部がディスクに流出します。 キャッシュされたデータをディスクに書き込むための十分なスペースがない場合、ブロックは削除され、キャッシュの利点は得られません。 より大きなワーカーを使用すると、より多くのデータをメモリにキャッシュできるようになり、基になるストレージではなくメモリからキャッシュされたブロックを取得することでジョブのパフォーマンスが向上します。

実験

たとえば、次の PySpark ジョブ 99.95 つのエグゼキュータがデータの XNUMX% を処理し、次のようなメモリ集約型の集計でスキューが発生します。 collect_list. このジョブは、非常に大きなデータ フレーム (2.2 TB) もキャッシュします。 次の vCPU とメモリ構成を使用して、EMR サーバーレスで同じジョブを XNUMX 回実行してみましょう。

以前に可能な最大のワーカー構成でテスト 3 を実行してみましょう。

  • EMR サーバーレス アプリケーションの作成中に設定されたエグゼキュータのサイズ = 4 vCPU、30 GB メモリ、200 GB ディスク
  • Spark ジョブ構成:
    • spark.executor.cores = 4
    • spark.executor.memory = 27G

新しくリリースされた大規模なワーカー構成でテスト 4 を実行してみましょう。

  • EMR サーバーレス アプリケーションの作成中に設定された executor のサイズ = 8 vCPU、60 GB メモリ、200 GB ディスク
  • Spark ジョブ構成:
    • spark.executor.cores = 8
    • spark.executor.memory = 54G

テスト 3 は失敗しました FetchFailedExceptionこれは、エグゼキューターのメモリがジョブに対して十分でないために発生しました。

また、テスト 3 の Spark UI から、エグゼキュータの予約ストレージ メモリがデータ フレームのキャッシュに完全に利用されていることがわかります。

エグゼキュータの stderr ログ:

23/02/06 16:06:58 INFO MemoryStore: Will not store rdd_4_1810
23/02/06 16:06:58 WARN MemoryStore: Not enough space to cache rdd_4_1810 in memory! (computed 134.1 MiB so far)
23/02/06 16:06:58 INFO MemoryStore: Memory use = 14.8 GiB (blocks) + 507.5 MiB (scratch space shared across 4 tasks(s)) = 15.3 GiB. Storage limit = 15.3 GiB.
23/02/06 16:06:58 WARN BlockManager: Persisting block rdd_4_1810 to disk instead.

永続化されたデータ フレームの約 33% がディスクにキャッシュされました。 Storage Spark UI のタブ。

より大きなエグゼキューターと仮想コアを使用したテスト 4 は、メモリ関連のエラーをスローすることなく正常に実行されました。 また、データ フレームの約 2.2% のみがディスクにキャッシュされました。 したがって、データ フレームのキャッシュされたブロックは、ディスクではなくメモリから取得されるため、パフォーマンスが向上します。

提言

大規模なワーカーがメモリ集約型の Spark アプリケーションに役立つかどうかを判断するには、次の点を考慮してください。

  • Spark UI を見て、Spark アプリケーションにデータの偏りがあるかどうかを判断します。 次の Spark UI のスクリーンショットは、145.2 つのタスクがほとんどのデータ (XNUMX GB) を処理するデータ スキュー シナリオの例を示しています。 シャッフル読み取り サイズ。 60 つまたは少数のタスクが他のタスクよりも大幅に多くのデータを処理する場合は、120 ~ XNUMX G のメモリを備えたより大きなワーカーでアプリケーションを再実行します (spark.executor.memory の 54% を考慮して 109 ~ 10 GB の任意の場所に設定します。 spark.executor.memoryOverhead).

  • チェック Storage Spark History Server のタブを使用して、メモリにキャッシュされたデータとディスクの比率を確認します。 メモリ内のサイズ および ディスクのサイズ 列。 データの 10% 以上がディスクにキャッシュされている場合は、より大きなワーカーでアプリケーションを再実行して、メモリにキャッシュされるデータの量を増やします。
  • ジョブがより多くのメモリを必要とするかどうかを事前に判断するもう XNUMX つの方法は、監視することです。 ピーク時の JVM メモリ Spark UI で 執行者 タブ。 使用されるピーク JVM メモリがエグゼキュータまたはドライバ メモリに近い場合は、より大きなワーカーを使用してアプリケーションを作成し、より高い値を設定できます。 spark.executor.memory or spark.driver.memory. たとえば、次のスクリーンショットでは、ピーク時の JVM メモリ使用量の最大値は 26 GB であり、 spark.executor.memory は 27 G に設定されています。この場合、60 GB のメモリを備えたより大きなワーカーを使用し、 spark.executor.memory 54Gに設定。

考慮事項

大規模な vCPU はシャッフル ブロックの局所性を高めるのに役立ちますが、ディスク スループット、ディスク IOPS (XNUMX 秒あたりの入出力操作数)、ネットワーク帯域幅など、他の要因も関係します。 場合によっては、より多くのディスクを備えた小規模なワーカーを増やすと、大規模なワーカーが少ない場合と比較して、ディスク IOPS、スループット、およびネットワーク帯域幅全体が高くなる可能性があります。 ワークロードに最適な構成を選択するために、適切な vCPU 構成に対してワークロードをベンチマークすることをお勧めします。

シャッフルが多いジョブの場合は、大きなディスクを使用することをお勧めします。 アプリケーションの作成時に、各ワーカーに最大 200 GB のディスクを接続できます。 大規模な vCPU の使用 (spark.executor.cores) executor ごとに使用すると、各ワーカーのディスク使用率が増加する可能性があります。 シャッフル データをディスクに収めることができないためにアプリケーションが「デバイスに空き領域がありません」で失敗する場合は、200 GB ディスクでより小さなワーカーを使用してください。

まとめ

この投稿では、EMR サーバーレス ジョブに大規模なエグゼキューターを使用する利点について学びました。 さまざまなワーカー構成の詳細については、次を参照してください。 ワーカー構成. 大規模なワーカー構成は、EMR サーバーレスが利用可能なすべてのリージョンで利用できます 利用できます.


著者について

ヴィーナ・ヴァスデバン AWS のシニア パートナー ソリューション アーキテクトであり、ビッグデータと分析を専門とする Amazon EMR スペシャリストです。 彼女は、顧客とパートナーが高度に最適化され、スケーラブルで安全なソリューションを構築するのを支援しています。 アーキテクチャを近代化します。 ビッグデータ ワークロードを AWS に移行します。

タイムスタンプ:

より多くの AWSビッグデータ