Amazon EMR サーバーレス クラスターやサーバーを管理することなく、Apache Spark や Apache Hive などのオープンソースのビッグ データ フレームワークを実行できます。 EMR サーバーレスを使用すると、変化するデータ量と処理要件に合わせて数秒でリソースのサイズを変更する自動スケーリングを使用して、あらゆる規模で分析ワークロードを実行できます。 EMR サーバーレスは、リソースを自動的にスケールアップおよびスケールダウンして、アプリケーションに適切な量の容量を提供します。
EMR サーバーレスは、最大 8 GB のメモリを備えた 60 個の vCPU と最大 16 GB のメモリを備えた 120 個の vCPU のワーカー構成を提供するようになりました。これにより、EMR サーバーレスでより多くのコンピューティングとメモリ集約型のワークロードを実行できるようになります。 EMR サーバーレス アプリケーションは、ワーカーを内部的に使用してワークロードを実行します。 また、ワークロードの要件に基づいてさまざまなワーカー構成を構成できます。 以前は、EMR サーバーレスで利用可能な最大のワーカー構成は、最大 4 GB のメモリを備えた 30 つの vCPU でした。 この機能は、次の一般的なシナリオで特に役立ちます。
- シャッフル負荷の高いワークロード
- メモリ集約型のワークロード
これらの各ユース ケースと、より大きなワーカー サイズを持つことの利点を見てみましょう。
シャッフルが集中するワークロードに大規模なワーカーを使用する利点
Spark と Hive では、計算中にデータをクラスター全体に再分散する必要がある場合にシャッフルが発生します。 アプリケーションが幅広い変換を実行したり、次のような操作を減らしたりする場合 join
, groupBy
, sortBy
または repartition
、Spark および Hive がシャッフルをトリガーします。 また、すべての Spark ステージと Tez 頂点は、シャッフル操作によって制限されます。 Spark を例にとると、デフォルトでは、Spark ジョブごとに 200 のパーティションが定義されています。 spark.sql.shuffle.partitions
. ただし、Spark は、データ サイズと実行中の操作に基づいて、オンザフライでタスクの数を計算します。 大規模なデータセットに対して幅広い変換が実行される場合、すべてのタスクで取得する必要がある GB 単位または TB 単位のデータが存在する可能性があります。
通常、シャッフルは時間とリソースの両方の面でコストがかかり、パフォーマンスのボトルネックにつながる可能性があります。 したがって、シャッフルを最適化すると、Spark ジョブのパフォーマンスとコストに大きな影響を与える可能性があります。 大規模なワーカーを使用すると、各エグゼキューターのメモリにより多くのデータを割り当てることができるため、エグゼキューター間でシャッフルされるデータが最小限に抑えられます。 これにより、同じワーカーからローカルにフェッチされるデータが増え、他のワーカーからリモートでフェッチされるデータが減るため、シャッフル読み取りのパフォーマンスが向上します。
実験
シャッフルを多用するクエリに大規模なワーカーを使用する利点を示すために、使用してみましょう q78 これは、167 の Spark ステージで 12 GB のデータをシャッフルする、シャッフルの多い Spark クエリです。 同じクエリを異なる構成で XNUMX 回繰り返してみましょう。
テスト 1 の構成は次のとおりです。
- EMR サーバーレス アプリケーションの作成中に要求されたエグゼキュータのサイズ = 4 vCPU、8 GB メモリ、200 GB ディスク
- Spark ジョブ構成:
spark.executor.cores
= 4spark.executor.memory
= 8spark.executor.instances
= 48- 並列度 = 192 (
spark.executor.instances
*spark.executor.cores
)
テスト 2 の構成は次のとおりです。
- EMR サーバーレス アプリケーションの作成中に要求されたエグゼキュータのサイズ = 8 vCPU、16 GB メモリ、200 GB ディスク
- Spark ジョブ構成:
spark.executor.cores
= 8spark.executor.memory
= 16spark.executor.instances
= 24- 並列度 = 192 (
spark.executor.instances
*spark.executor.cores
)
設定で動的割り当ても無効にしましょう spark.dynamicAllocation.enabled
〜へ false
両方のテストで、エグゼキューターの起動時間が可変であることによる潜在的なノイズを回避し、両方のテストでリソースの使用率を一定に保つことができます。 を使用しております 火花対策は、Spark パフォーマンス メトリックの収集と分析を簡素化するオープン ソース ツールです。 固定数のエグゼキュータを使用しているため、vCPU の総数と要求されるメモリは両方のテストで同じです。 次の表は、Spark Measure で収集されたメトリックからの観測結果をまとめたものです。
. | クエリにかかった合計時間 (ミリ秒) | シャッフルローカルブロックフェッチ | shuffleRemoteBlocksFetched | シャッフルローカルバイト読み取り | shuffleRemoteBytesRead | シャッフルフェッチ待機時間 | シャッフル書き込み時間 |
テスト1 | 153244 | 114175 | 5291825 | 3.5 GB | 163.1 GB | 1.9時間 | 4.7分 |
テスト2 | 108136 | 225448 | 5185552 | 6.9 GB | 159.7 GB | 3.2分 | 5.2分 |
表からわかるように、シャッフルの改良により性能に大きな差が出ています。 テスト 2 は、テスト 1 の 29.44 倍のエグゼキューターの数を半分にして実行が 1.97% 速くなり、同じクエリ、同じ並列処理、同じ集計 vCPU およびメモリ リソースのテスト 1 と比較して、XNUMX 倍のシャッフル データがローカルにフェッチされました。 . したがって、大規模なエグゼキューターの助けを借りて、コストやジョブの並列処理を犠牲にすることなく、パフォーマンスを向上させることができます。 次のようなシャッフルを多用する他の TPC-DS クエリでも、同様のパフォーマンス上の利点が見られます。 q23a および q23b.
提言
大規模なワーカーがシャッフル集約型の Spark アプリケーションに役立つかどうかを判断するには、次の点を考慮してください。
- チェック インターンシップ EMR サーバーレス アプリケーションの Spark History Server UI のタブ。 たとえば、次の Spark History Server のスクリーンショットから、この Spark ジョブが 167 ステージにわたって集約された 12 GB のシャッフル データを読み書きしたことを確認できます。 シャッフル読み取り および シャッフル書き込み 列。 ジョブが 50 GB を超えるデータをシャッフルする場合、8 または 16 vCPU または
spark.executor.cores
.
- チェック SQL/データフレーム EMR サーバーレス アプリケーションの Spark History Server UI のタブ (Dataframe および Dataset API のみ)。 collect、take、showString、save など、実行される Spark アクションを選択すると、交換によって分離されたすべてのステージの集約された DAG が表示されます。 DAG 内のすべての交換はシャッフル操作に対応し、次のスクリーンショットに示すように、シャッフルされたローカルおよびリモートのバイトとブロックが含まれます。 フェッチされたローカル シャッフル ブロックまたはバイト数が、リモート ブロックまたはフェッチされたバイト数と比較してはるかに少ない場合は、より大きなワーカー (8 または 16 個の vCPU または spark.executor.core を使用) でアプリケーションを再実行し、これらの交換メトリクスを DAG で確認して、改善があるかどうかを確認します。
- 火花対策 ツールを Spark クエリで使用して、Spark ドライバーのシャッフル メトリックを取得します。
stdout
次の Spark ジョブのログに示すように、ログ。 シャッフル読み取りにかかった時間を確認します (shuffleFetchWaitTime
) およびシャッフル書き込み (shuffleWriteTime
)、およびフェッチされたリモート バイトに対するフェッチされたローカル バイトの比率。 シャッフル操作に 2 分以上かかる場合は、より大きなワーカー (8 または 16 vCPU またはspark.executor.cores
) を Spark Measure と共に使用して、シャッフル パフォーマンスと全体的なジョブ実行時間の改善を追跡します。
メモリ集約型のワークロードに大規模なワーカーを使用する利点
特定の種類のワークロードはメモリを集中的に使用するため、ワーカーごとに構成されたメモリを増やすことでメリットが得られる場合があります。 このセクションでは、大規模なワーカーがメモリ集約型ワークロードの実行に役立つ一般的なシナリオについて説明します。
データの偏り
データの偏りは、通常、いくつかの種類のデータセットで発生します。 一般的な例として、不正行為の検出、人口分析、所得分配などがあります。 たとえば、データの異常を検出する場合、異常なデータは 1% 未満であると予想されます。 正常なレコードと異常なレコードに加えて集計を実行する場合、データの 99% が単一のワーカーによって処理されるため、そのワーカーのメモリが不足する可能性があります。 次のようなメモリ集約型の変換では、データ スキューが観察される場合があります。 groupBy
, orderBy
, join
、ウィンドウ関数、 collect_list
, collect_set
、 等々。 などの結合タイプ BroadcastNestedLoopJoin
およびデカルト積も、本質的にメモリ集約型であり、データ スキューの影響を受けやすくなります。 同様に、入力データが Gzip 圧縮されている場合、Gzip 圧縮タイプは分割できないため、XNUMX つの Gzip ファイルを複数のタスクで読み取ることはできません。 入力にいくつかの非常に大きな Gzip ファイルがある場合、XNUMX つのタスクが実行プログラムのメモリに収まらない巨大な Gzip ファイルを読み取らなければならない可能性があるため、ジョブがメモリ不足になる可能性があります。
データの偏りによる障害は、ソルティングなどの戦略を適用することで軽減できます。 ただし、これには多くの場合、コードへの大幅な変更が必要であり、受信データ量の突然の急増によって前例のないデータ スキューが原因で失敗した本番環境のワークロードには適していない可能性があります。 より簡単な回避策として、ワーカー メモリを増やしてください。 より大きなワーカーをより多く使用する spark.executor.memory
アプリケーション コードを変更せずにデータ スキューを処理できます。
キャッシング
パフォーマンスを向上させるために、Spark ではデータ フレーム、データセット、および RDD をメモリにキャッシュできます。 これにより、再計算することなく、アプリケーションでデータ フレームを複数回再利用できます。 デフォルトでは、executor の JVM の最大 50% が、データ フレームのキャッシュに使用されます。 property spark.memory.storageFraction
。 たとえば、 spark.executor.memory
が 30 GB に設定されている場合、エビクションの影響を受けないキャッシュ ストレージに 15 GB が使用されます。
キャッシュ操作のデフォルトのストレージ レベルは DISK_AND_MEMORY
. キャッシュしようとしているデータ フレームのサイズがエグゼキュータのメモリに収まらない場合、キャッシュの一部がディスクに流出します。 キャッシュされたデータをディスクに書き込むための十分なスペースがない場合、ブロックは削除され、キャッシュの利点は得られません。 より大きなワーカーを使用すると、より多くのデータをメモリにキャッシュできるようになり、基になるストレージではなくメモリからキャッシュされたブロックを取得することでジョブのパフォーマンスが向上します。
実験
たとえば、次の PySpark ジョブ 99.95 つのエグゼキュータがデータの XNUMX% を処理し、次のようなメモリ集約型の集計でスキューが発生します。 collect_list
. このジョブは、非常に大きなデータ フレーム (2.2 TB) もキャッシュします。 次の vCPU とメモリ構成を使用して、EMR サーバーレスで同じジョブを XNUMX 回実行してみましょう。
以前に可能な最大のワーカー構成でテスト 3 を実行してみましょう。
- EMR サーバーレス アプリケーションの作成中に設定されたエグゼキュータのサイズ = 4 vCPU、30 GB メモリ、200 GB ディスク
- Spark ジョブ構成:
spark.executor.cores
= 4spark.executor.memory
= 27G
新しくリリースされた大規模なワーカー構成でテスト 4 を実行してみましょう。
- EMR サーバーレス アプリケーションの作成中に設定された executor のサイズ = 8 vCPU、60 GB メモリ、200 GB ディスク
- Spark ジョブ構成:
spark.executor.cores
= 8spark.executor.memory
= 54G
テスト 3 は失敗しました FetchFailedException
これは、エグゼキューターのメモリがジョブに対して十分でないために発生しました。
また、テスト 3 の Spark UI から、エグゼキュータの予約ストレージ メモリがデータ フレームのキャッシュに完全に利用されていることがわかります。
エグゼキュータの stderr
ログ:
永続化されたデータ フレームの約 33% がディスクにキャッシュされました。 Storage Spark UI のタブ。
より大きなエグゼキューターと仮想コアを使用したテスト 4 は、メモリ関連のエラーをスローすることなく正常に実行されました。 また、データ フレームの約 2.2% のみがディスクにキャッシュされました。 したがって、データ フレームのキャッシュされたブロックは、ディスクではなくメモリから取得されるため、パフォーマンスが向上します。
提言
大規模なワーカーがメモリ集約型の Spark アプリケーションに役立つかどうかを判断するには、次の点を考慮してください。
- Spark UI を見て、Spark アプリケーションにデータの偏りがあるかどうかを判断します。 次の Spark UI のスクリーンショットは、145.2 つのタスクがほとんどのデータ (XNUMX GB) を処理するデータ スキュー シナリオの例を示しています。 シャッフル読み取り サイズ。 60 つまたは少数のタスクが他のタスクよりも大幅に多くのデータを処理する場合は、120 ~ XNUMX G のメモリを備えたより大きなワーカーでアプリケーションを再実行します (
spark.executor.memory
の 54% を考慮して 109 ~ 10 GB の任意の場所に設定します。spark.executor.memoryOverhead
).
- チェック Storage Spark History Server のタブを使用して、メモリにキャッシュされたデータとディスクの比率を確認します。 メモリ内のサイズ および ディスクのサイズ 列。 データの 10% 以上がディスクにキャッシュされている場合は、より大きなワーカーでアプリケーションを再実行して、メモリにキャッシュされるデータの量を増やします。
- ジョブがより多くのメモリを必要とするかどうかを事前に判断するもう XNUMX つの方法は、監視することです。 ピーク時の JVM メモリ Spark UI で 執行者 タブ。 使用されるピーク JVM メモリがエグゼキュータまたはドライバ メモリに近い場合は、より大きなワーカーを使用してアプリケーションを作成し、より高い値を設定できます。
spark.executor.memory
orspark.driver.memory
. たとえば、次のスクリーンショットでは、ピーク時の JVM メモリ使用量の最大値は 26 GB であり、spark.executor.memory
は 27 G に設定されています。この場合、60 GB のメモリを備えたより大きなワーカーを使用し、spark.executor.memory
54Gに設定。
考慮事項
大規模な vCPU はシャッフル ブロックの局所性を高めるのに役立ちますが、ディスク スループット、ディスク IOPS (XNUMX 秒あたりの入出力操作数)、ネットワーク帯域幅など、他の要因も関係します。 場合によっては、より多くのディスクを備えた小規模なワーカーを増やすと、大規模なワーカーが少ない場合と比較して、ディスク IOPS、スループット、およびネットワーク帯域幅全体が高くなる可能性があります。 ワークロードに最適な構成を選択するために、適切な vCPU 構成に対してワークロードをベンチマークすることをお勧めします。
シャッフルが多いジョブの場合は、大きなディスクを使用することをお勧めします。 アプリケーションの作成時に、各ワーカーに最大 200 GB のディスクを接続できます。 大規模な vCPU の使用 (spark.executor.cores
) executor ごとに使用すると、各ワーカーのディスク使用率が増加する可能性があります。 シャッフル データをディスクに収めることができないためにアプリケーションが「デバイスに空き領域がありません」で失敗する場合は、200 GB ディスクでより小さなワーカーを使用してください。
まとめ
この投稿では、EMR サーバーレス ジョブに大規模なエグゼキューターを使用する利点について学びました。 さまざまなワーカー構成の詳細については、次を参照してください。 ワーカー構成. 大規模なワーカー構成は、EMR サーバーレスが利用可能なすべてのリージョンで利用できます 利用できます.
著者について
ヴィーナ・ヴァスデバン AWS のシニア パートナー ソリューション アーキテクトであり、ビッグデータと分析を専門とする Amazon EMR スペシャリストです。 彼女は、顧客とパートナーが高度に最適化され、スケーラブルで安全なソリューションを構築するのを支援しています。 アーキテクチャを近代化します。 ビッグデータ ワークロードを AWS に移行します。
- SEO を活用したコンテンツと PR 配信。 今日増幅されます。
- Platoblockchain。 Web3メタバースインテリジェンス。 知識の増幅。 こちらからアクセスしてください。
- 情報源: https://aws.amazon.com/blogs/big-data/amazon-emr-serverless-supports-larger-worker-sizes-to-run-more-compute-and-memory-intensive-workloads/
- 1
- 10
- 100
- 11
- 2%
- 7
- 70
- 9
- 視聴者の38%が
- a
- 私たちについて
- 越えて
- Action
- に対して
- 凝集
- すべて
- 割り当てられました
- 配分
- 許可
- ことができます
- Amazon
- アマゾンEMR
- 量
- 分析
- 分析論
- および
- アナウンス
- どこにでも
- アパッチ
- Apache Spark
- API
- 申し込み
- 適用
- アタッチ
- オートマチック
- 自動的に
- 利用できます
- 避ける
- AWS
- 帯域幅
- ベース
- なぜなら
- さ
- ベンチマーク
- 有益な
- 恩恵
- 利点
- BEST
- より良いです
- ビッグ
- ビッグデータ
- ブロック
- ブロック
- 後押し
- ボトルネック
- ビルド
- キャッシュ
- 容量
- 場合
- 例
- 生じました
- 変更
- 変化
- 選択する
- 閉じる
- クラスタ
- コード
- 収集する
- コレクション
- コラム
- コマンドと
- 一般に
- 比べ
- 妥協する
- 計算
- 計算
- 構成
- 検討
- 整合性のある
- コンテキスト
- 対応する
- 費用
- 可能性
- 作ります
- 作成
- Customers
- DAG
- データ
- データセット
- デフォルト
- 定義済みの
- 度
- 実証します
- 検出
- 決定する
- 違い
- 異なります
- 話し合います
- ディストリビューション
- そうではありません
- ドント
- ダウン
- ドライバー
- 間に
- ダイナミック
- 各
- 可能
- 奨励する
- 十分な
- エラー
- 特に
- エーテル(ETH)
- さらに
- あらゆる
- 例
- 例
- 交換
- 交換について
- 興奮した
- 実行します
- 予想される
- 高価な
- 広範囲
- 要因
- Failed:
- 失敗
- 遠く
- 速いです
- 実行可能な
- フェッチ
- 少数の
- File
- フィット
- 固定の
- 焦点
- フォロー中
- 次
- FRAME
- フレームワーク
- 詐欺
- 不正検出
- から
- 完全に
- 機能
- 取得する
- 半分
- ハンドル
- 持って
- 助けます
- ことができます
- より高い
- 非常に
- history
- ハイブ
- しかしながら
- HTML
- HTTPS
- 巨大な
- 影響
- 改善します
- 改善されました
- 改善
- 改善
- in
- できないこと
- 所得
- 入ってくる
- 増える
- 増加した
- info
- 情報
- を取得する必要がある者
- 関係する
- IT
- 繰り返し
- ジョブ
- Jobs > Create New Job
- join
- キープ
- 大
- より大きい
- 最大の
- 起動する
- つながる
- リード
- 学んだ
- レベル
- LIMIT
- ローカル
- 局部的に
- 見て
- 探して
- 作成
- 管理する
- だけど
- 大会
- メモリ
- メトリック
- 移動します
- 分
- モード
- 近代化します
- モニタリング
- 他には?
- 最も
- MS
- の試合に
- 必要
- ニーズ
- ネットワーク
- ノイズ
- 通常の
- 数
- 入手する
- 提供
- 提供すること
- オファー
- ONE
- オープンソース
- 操作
- 業務執行統括
- 最適化
- 最適化
- 注文
- その他
- 全体
- パートナー
- パートナー
- ピーク
- 実行する
- パフォーマンス
- 実行する
- プラトン
- プラトンデータインテリジェンス
- プラトデータ
- 人口
- 可能
- ポスト
- 潜在的な
- :
- 前に
- プロセス
- ラボレーション
- 処理
- プロダクト
- 生産
- 提供します
- 比
- 読む
- 推奨される
- 記録
- 減らします
- 地域
- リリース
- 残り
- リモート
- 要求されました
- 要件
- 必要
- 予約済み
- リソースを追加する。
- リソース
- レビュー
- ラン
- ランニング
- 同じ
- Save
- ド電源のデ
- 規模
- 秤
- スケーリング
- シナリオ
- シナリオ
- 二番
- 秒
- セクション
- 安全に
- シニア
- サーバレス
- サーバー
- セッションに
- 設定
- いくつかの
- shared
- 示す
- 作品
- シャッフル
- 重要
- 著しく
- 同様の
- 同様に
- サイズ
- サイズ
- 斜め
- 小さい
- より小さい
- So
- これまでのところ
- ソリューション
- 一部
- スペース
- スパーク
- 専門家
- SQL
- ステージ
- ステージ
- ストレージ利用料
- 店舗
- 作戦
- 首尾よく
- そのような
- 突然の
- 十分な
- 適当
- サポート
- 発生します
- がち
- テーブル
- 取る
- 取り
- 取得
- 仕事
- タスク
- 条件
- test
- テスト
- アプリ環境に合わせて
- したがって、
- スループット
- 投げる
- 時間
- <font style="vertical-align: inherit;">回数</font>
- 〜へ
- ツール
- top
- トータル
- 追跡する
- 変換
- 変換
- 順番
- 一般的に
- ui
- 根本的な
- 前例のない
- 使用法
- つかいます
- 利用された
- 値
- ボリューム
- ボリューム
- which
- while
- ワイド
- 意志
- 無し
- ワーカー
- 労働者
- 書きます
- あなたの
- ゼファーネット