Amazon EMR Serverless unterstützt größere Worker-Größen, um rechen- und speicherintensivere Workloads auszuführen

Amazon EMR Serverless unterstützt größere Worker-Größen, um rechen- und speicherintensivere Workloads auszuführen

Quellknoten: 1960092

Amazon EMR ohne Server ermöglicht es Ihnen, Open-Source-Big-Data-Frameworks wie Apache Spark und Apache Hive auszuführen, ohne Cluster und Server verwalten zu müssen. Mit EMR Serverless können Sie Analyse-Workloads in jeder Größenordnung mit automatischer Skalierung ausführen, die die Ressourcen in Sekundenschnelle an sich ändernde Datenmengen und Verarbeitungsanforderungen anpasst. EMR Serverless skaliert Ressourcen automatisch nach oben und unten, um genau die richtige Menge an Kapazität für Ihre Anwendung bereitzustellen.

Wir freuen uns, Ihnen mitteilen zu können, dass EMR Serverless jetzt Worker-Konfigurationen mit 8 vCPUs mit bis zu 60 GB Arbeitsspeicher und 16 vCPUs mit bis zu 120 GB Arbeitsspeicher anbietet, sodass Sie rechen- und speicherintensivere Workloads auf EMR Serverless ausführen können. Eine serverlose EMR-Anwendung verwendet intern Worker, um Workloads auszuführen. und Sie können basierend auf Ihren Workload-Anforderungen verschiedene Worker-Konfigurationen konfigurieren. Zuvor war die größte Worker-Konfiguration, die auf EMR Serverless verfügbar war, 4 vCPUs mit bis zu 30 GB Arbeitsspeicher. Diese Funktion ist besonders vorteilhaft für die folgenden gängigen Szenarien:

  • Shuffle-lastige Workloads
  • Speicherintensive Workloads

Sehen wir uns jeden dieser Anwendungsfälle und die Vorteile größerer Worker-Größen an.

Vorteile der Verwendung großer Worker für Shuffle-intensive Workloads

In Spark und Hive tritt Shuffle auf, wenn Daten während einer Berechnung über den Cluster neu verteilt werden müssen. Wenn Ihre Anwendung umfangreiche Transformationen durchführt oder Operationen wie z join, groupBy, sortBy, oder repartition, Spark und Hive löst ein Mischen aus. Außerdem ist jede Spark-Stufe und jeder Tez-Vertex durch eine Shuffle-Operation begrenzt. Am Beispiel von Spark gibt es standardmäßig 200 Partitionen für jeden Spark-Job, der von definiert wird spark.sql.shuffle.partitions. Spark berechnet die Anzahl der Aufgaben jedoch spontan basierend auf der Datengröße und der ausgeführten Operation. Wenn eine umfassende Transformation auf einem großen Dataset durchgeführt wird, können GB oder sogar TB an Daten vorhanden sein, die von allen Aufgaben abgerufen werden müssen.

Shuffles sind in der Regel zeit- und ressourcenintensiv und können zu Leistungsengpässen führen. Daher kann die Optimierung von Shuffles erhebliche Auswirkungen auf die Leistung und die Kosten eines Spark-Jobs haben. Bei großen Workern können dem Speicher jedes Executors mehr Daten zugewiesen werden, wodurch die zwischen den Executors hin- und hergeschobenen Daten minimiert werden. Dies wiederum führt zu einer erhöhten Shuffle-Leseleistung, da mehr Daten lokal von demselben Worker abgerufen werden und weniger Daten remote von anderen Workern abgerufen werden.

Experimente

Um die Vorteile der Verwendung großer Worker für shuffle-intensive Abfragen zu demonstrieren, verwenden wir q78 von TPC-DS, einer shuffle-lastigen Spark-Abfrage, die 167 GB Daten über 12 Spark-Stufen shufflet. Lassen Sie uns zwei Iterationen derselben Abfrage mit unterschiedlichen Konfigurationen durchführen.

Die Konfigurationen für Test 1 sind wie folgt:

  • Größe des beim Erstellen der serverlosen EMR-Anwendung angeforderten Executors = 4 vCPUs, 8 GB Arbeitsspeicher, 200 GB Festplatte
  • Spark-Job-Konfiguration:
    • spark.executor.cores = 4
    • spark.executor.memory = 8
    • spark.executor.instances = 48
    • Parallelität = 192 (spark.executor.instances * spark.executor.cores)

Die Konfigurationen für Test 2 sind wie folgt:

  • Größe des beim Erstellen der serverlosen EMR-Anwendung angeforderten Executors = 8 vCPUs, 16 GB Arbeitsspeicher, 200 GB Festplatte
  • Spark-Job-Konfiguration:
    • spark.executor.cores = 8
    • spark.executor.memory = 16
    • spark.executor.instances = 24
    • Parallelität = 192 (spark.executor.instances * spark.executor.cores)

Lassen Sie uns auch die dynamische Zuordnung durch Einstellung deaktivieren spark.dynamicAllocation.enabled zu false für beide Tests, um potenzielles Rauschen aufgrund variabler Executor-Startzeiten zu vermeiden und die Ressourcennutzung für beide Tests konsistent zu halten. Wir gebrauchen Funkenmessung, ein Open-Source-Tool, das die Erfassung und Analyse von Spark-Leistungsmetriken vereinfacht. Da wir eine feste Anzahl von Executoren verwenden, ist die Gesamtzahl der angeforderten vCPUs und Speicher für beide Tests gleich. Die folgende Tabelle fasst die Beobachtungen aus den mit Spark Measure gesammelten Metriken zusammen.

. Gesamtdauer der Abfrage in Millisekunden shuffleLocalBlocksGeholt shuffleRemoteBlocksAbgeholt shuffleLocalBytesRead shuffleRemoteBytesRead shuffleFetchWaitTime shuffleWriteTime
Testen 1 153244 114175 5291825 3.5 GB 163.1 GB 1.9 hr 4.7 min
Testen 2 108136 225448 5185552 6.9 GB 159.7 GB 3.2 min 5.2 min

Wie aus der Tabelle ersichtlich ist, gibt es aufgrund von Shuffle-Verbesserungen einen signifikanten Leistungsunterschied. Test 2, mit der halben Anzahl von Executoren, die doppelt so groß sind wie Test 1, lief 29.44 % schneller, mit 1.97-mal mehr lokal abgerufenen Shuffle-Daten im Vergleich zu Test 1 für dieselbe Abfrage, dieselbe Parallelität und dieselben aggregierten vCPU- und Speicherressourcen . Daher können Sie mit Hilfe großer Executors von einer verbesserten Leistung profitieren, ohne Kompromisse bei den Kosten oder der Jobparallelität eingehen zu müssen. Wir haben ähnliche Leistungsvorteile für andere Shuffle-intensive TPC-DS-Abfragen wie z q23a und q23b.

Empfehlungen

Berücksichtigen Sie Folgendes, um festzustellen, ob die großen Worker Ihren shuffle-intensiven Spark-Anwendungen zugute kommen:

  • Prüfen Sie die Praktika Registerkarte aus der Benutzeroberfläche des Spark-Verlaufsservers Ihrer serverlosen EMR-Anwendung. Aus dem folgenden Screenshot von Spark History Server können wir beispielsweise feststellen, dass dieser Spark-Job 167 GB Shuffle-Daten geschrieben und gelesen hat, die über 12 Stufen aggregiert wurden, wenn man sich die ansieht Shuffle-Lesen und Shuffle Write Säulen. Wenn Ihre Jobs mehr als 50 GB an Daten verschieben, können Sie möglicherweise von größeren Workern mit 8 oder 16 vCPUs oder vCPUs profitieren spark.executor.cores.

  • Prüfen Sie die SQL / Datenrahmen auf der Spark History Server-Benutzeroberfläche Ihrer serverlosen EMR-Anwendung (nur für Dataframe- und Dataset-APIs). Wenn Sie die durchgeführte Spark-Aktion auswählen, z. B. „Collect“, „Take“, „ShowString“ oder „Save“, sehen Sie einen aggregierten DAG für alle Phasen, getrennt nach den Austauschvorgängen. Jeder Austausch im DAG entspricht einem Shuffle-Vorgang und enthält die lokalen und Remote-Bytes und -Blöcke gemischt, wie im folgenden Screenshot zu sehen ist. Wenn die abgerufenen lokalen Shuffle-Blöcke oder -Bytes im Vergleich zu den abgerufenen Remote-Blöcken oder -Bytes viel geringer sind, können Sie Ihre Anwendung mit größeren Workern (mit 8 oder 16 vCPUs oder spark.executor.cores) erneut ausführen und diese Austauschmetriken in einem DAG überprüfen mal sehen ob es eine besserung gibt.

  • Verwenden Sie das Funkenmessung Tool mit Ihrer Spark-Abfrage, um die Shuffle-Metriken in den Spark-Treibern zu erhalten stdout logs, wie im folgenden Protokoll für einen Spark-Auftrag gezeigt. Überprüfen Sie die für Shuffle-Reads benötigte Zeit (shuffleFetchWaitTime) und Shuffle schreibt (shuffleWriteTime) und das Verhältnis der abgerufenen lokalen Bytes zu den abgerufenen entfernten Bytes. Wenn der Shuffle-Vorgang länger als 2 Minuten dauert, führen Sie Ihre Anwendung mit größeren Workern (mit 8 oder 16 vCPUs bzw spark.executor.cores) mit Spark Measure, um die Verbesserung der Shuffle-Leistung und der gesamten Auftragslaufzeit zu verfolgen.
Time taken: 177647 ms Scheduling mode = FIFO
Spark Context default degree of parallelism = 192 Aggregated Spark stage metrics:
numStages => 22
numTasks => 10156
elapsedTime => 159894 (2.7 min)
stageDuration => 456893 (7.6 min)
executorRunTime => 28418517 (7.9 h)
executorCpuTime => 20276736 (5.6 h)
executorDeserializeTime => 326486 (5.4 min)
executorDeserializeCpuTime => 124323 (2.1 min)
resultSerializationTime => 534 (0.5 s)
jvmGCTime => 648809 (11 min)
shuffleFetchWaitTime => 340880 (5.7 min)
shuffleWriteTime => 245918 (4.1 min)
resultSize => 23199434 (22.1 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1794288453176
recordsRead => 18696929278
bytesRead => 77354154397 (72.0 GB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 14124240761
shuffleTotalBlocksFetched => 5571316
shuffleLocalBlocksFetched => 117321
shuffleRemoteBlocksFetched => 5453995
shuffleTotalBytesRead => 158582120627 (147.7 GB)
shuffleLocalBytesRead => 3337930126 (3.1 GB)
shuffleRemoteBytesRead => 155244190501 (144.6 GB)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 156913371886 (146.1 GB)
shuffleRecordsWritten => 13867102620

Vorteile der Verwendung großer Worker für arbeitsspeicherintensive Workloads

Bestimmte Arten von Workloads sind arbeitsspeicherintensiv und können von mehr pro Worker konfiguriertem Arbeitsspeicher profitieren. In diesem Abschnitt diskutieren wir gängige Szenarien, in denen große Worker für die Ausführung arbeitsspeicherintensiver Workloads von Vorteil sein könnten.

Datenverzerrung

Datenverzerrungen treten häufig in mehreren Arten von Datensätzen auf. Einige gängige Beispiele sind Betrugserkennung, Bevölkerungsanalyse und Einkommensverteilung. Wenn Sie beispielsweise Anomalien in Ihren Daten erkennen möchten, wird erwartet, dass nur weniger als 1 % der Daten abnormal sind. Wenn Sie eine Aggregation zusätzlich zu normalen und abnormalen Datensätzen durchführen möchten, werden 99 % der Daten von einem einzelnen Worker verarbeitet, was dazu führen kann, dass diesem Worker nicht mehr genügend Arbeitsspeicher zur Verfügung steht. Bei speicherintensiven Transformationen wie z. B. können Datenverzerrungen beobachtet werden groupBy, orderBy, join, Fensterfunktionen, collect_list, collect_set, usw. Join-Typen wie z BroadcastNestedLoopJoin und kartesische Produkte sind ebenfalls von Natur aus speicherintensiv und anfällig für Datenverzerrungen. Wenn Ihre Eingabedaten Gzip-komprimiert sind, kann eine einzelne Gzip-Datei nicht von mehr als einer Aufgabe gelesen werden, da der Gzip-Komprimierungstyp nicht teilbar ist. Wenn die Eingabe einige sehr große Gzip-Dateien enthält, kann es sein, dass Ihrem Job der Arbeitsspeicher ausgeht, da eine einzelne Aufgabe möglicherweise eine riesige Gzip-Datei lesen muss, die nicht in den Arbeitsspeicher des Executors passt.

Fehler aufgrund von Datenverzerrung können durch Anwendung von Strategien wie Salting gemildert werden. Dies erfordert jedoch häufig umfangreiche Änderungen am Code, die für einen Produktions-Workload möglicherweise nicht durchführbar sind, der aufgrund einer beispiellosen Datenverzerrung aufgrund eines plötzlichen Anstiegs des eingehenden Datenvolumens fehlgeschlagen ist. Für eine einfachere Problemumgehung möchten Sie möglicherweise nur den Arbeitsspeicher erhöhen. Einsatz größerer Arbeiter mit mehr spark.executor.memory ermöglicht es Ihnen, Datenverzerrungen zu behandeln, ohne Änderungen an Ihrem Anwendungscode vorzunehmen.

Caching

Um die Leistung zu verbessern, ermöglicht Ihnen Spark, die Datenrahmen, Datasets und RDDs im Arbeitsspeicher zwischenzuspeichern. Dadurch können Sie einen Datenrahmen mehrmals in Ihrer Anwendung wiederverwenden, ohne ihn neu berechnen zu müssen. Standardmäßig werden bis zu 50 % der JVM Ihres Executors verwendet, um die Datenframes basierend auf dem Cache zwischenzuspeichern property spark.memory.storageFraction. Zum Beispiel, wenn Ihr spark.executor.memory auf 30 GB eingestellt ist, dann werden 15 GB für den Cache-Speicher verwendet, der nicht entfernt werden kann.

Die Standardspeicherebene des Cache-Betriebs ist DISK_AND_MEMORY. Wenn die Größe des Datenrahmens, den Sie zwischenspeichern möchten, nicht in den Arbeitsspeicher des Executors passt, wird ein Teil des Caches auf die Festplatte übertragen. Wenn nicht genügend Speicherplatz vorhanden ist, um die zwischengespeicherten Daten auf die Festplatte zu schreiben, werden die Blöcke entfernt und Sie können die Vorteile des Zwischenspeicherns nicht nutzen. Durch die Verwendung größerer Worker können Sie mehr Daten im Arbeitsspeicher zwischenspeichern und die Arbeitsleistung steigern, indem Sie zwischengespeicherte Blöcke aus dem Arbeitsspeicher und nicht aus dem zugrunde liegenden Speicher abrufen.

Experimente

Zum Beispiel die folgenden PySpark-Job führt zu einer Verzerrung, bei der ein Ausführender 99.95 % der Daten mit speicherintensiven Aggregaten wie verarbeitet collect_list. Der Job speichert auch einen sehr großen Datenrahmen (2.2 TB). Lassen Sie uns zwei Iterationen desselben Jobs auf EMR Serverless mit den folgenden vCPU- und Speicherkonfigurationen ausführen.

Lassen Sie uns Test 3 mit den bisher größtmöglichen Worker-Konfigurationen ausführen:

  • Größe des Executor-Satzes beim Erstellen der serverlosen EMR-Anwendung = 4 vCPUs, 30 GB Arbeitsspeicher, 200 GB Festplatte
  • Spark-Job-Konfiguration:
    • spark.executor.cores = 4
    • spark.executor.memory = 27G

Lassen Sie uns Test 4 mit den neu veröffentlichten großen Worker-Konfigurationen ausführen:

  • Größe des beim Erstellen der serverlosen EMR-Anwendung festgelegten Executors = 8 vCPUs, 60 GB Arbeitsspeicher, 200 GB Festplatte
  • Spark-Job-Konfiguration:
    • spark.executor.cores = 8
    • spark.executor.memory = 54G

Test 3 ist mit fehlgeschlagen FetchFailedException, was darauf zurückzuführen war, dass der Executor-Speicher für den Job nicht ausreichte.

Außerdem sehen wir in der Spark-Benutzeroberfläche von Test 3, dass der reservierte Speicher der Executors vollständig zum Zwischenspeichern der Datenframes genutzt wurde.

Die verbleibenden Blöcke zum Cachen wurden auf die Festplatte verschüttet, wie im Executor zu sehen ist stderr Protokolle:

23/02/06 16:06:58 INFO MemoryStore: Will not store rdd_4_1810
23/02/06 16:06:58 WARN MemoryStore: Not enough space to cache rdd_4_1810 in memory! (computed 134.1 MiB so far)
23/02/06 16:06:58 INFO MemoryStore: Memory use = 14.8 GiB (blocks) + 507.5 MiB (scratch space shared across 4 tasks(s)) = 15.3 GiB. Storage limit = 15.3 GiB.
23/02/06 16:06:58 WARN BlockManager: Persisting block rdd_4_1810 to disk instead.

Etwa 33 % des persistenten Datenrahmens wurden auf der Festplatte zwischengespeichert, wie auf der zu sehen ist Lagerung Registerkarte der Spark-Benutzeroberfläche.

Test 4 mit größeren Executoren und virtuellen Kernen wurde erfolgreich ausgeführt, ohne dass speicherbezogene Fehler ausgegeben wurden. Außerdem wurden nur etwa 2.2 % des Datenrahmens auf der Festplatte zwischengespeichert. Daher werden zwischengespeicherte Blöcke eines Datenrahmens aus dem Arbeitsspeicher und nicht von der Festplatte abgerufen, was eine bessere Leistung bietet.

Empfehlungen

Berücksichtigen Sie Folgendes, um festzustellen, ob die großen Worker Ihren speicherintensiven Spark-Anwendungen zugute kommen:

  • Bestimmen Sie, ob Ihre Spark-Anwendung irgendwelche Datenverzerrungen aufweist, indem Sie sich die Spark-Benutzeroberfläche ansehen. Der folgende Screenshot der Spark-Benutzeroberfläche zeigt ein Beispielszenario für eine Datenverzerrung, bei dem eine Aufgabe die meisten Daten verarbeitet (145.2 GB). Shuffle-Lesen Größe. Wenn eine oder weniger Aufgaben deutlich mehr Daten verarbeiten als andere Aufgaben, führen Sie Ihre Anwendung mit größeren Workern mit 60–120 G Arbeitsspeicher erneut aus (spark.executor.memory Stellen Sie zwischen 54 und 109 GB ein, wenn Sie 10 % von einbeziehen spark.executor.memoryOverhead).

  • Prüfen Sie die Lagerung Registerkarte des Spark-Verlaufsservers, um das Verhältnis der im Arbeitsspeicher zwischengespeicherten Daten auf der Festplatte zu überprüfen Größe im Speicher und Größe auf der Festplatte Säulen. Wenn mehr als 10 % Ihrer Daten auf der Festplatte zwischengespeichert werden, führen Sie Ihre Anwendung mit größeren Workern erneut aus, um die im Arbeitsspeicher zwischengespeicherte Datenmenge zu erhöhen.
  • Eine weitere Möglichkeit, präventiv festzustellen, ob Ihr Job mehr Arbeitsspeicher benötigt, ist die Überwachung Spitzenwert des JVM-Speichers auf der Spark-Benutzeroberfläche Vollstrecker Tab. Wenn der Spitzenwert des JVM-Arbeitsspeichers nahe dem Executor- oder Treiberarbeitsspeicher liegt, können Sie eine Anwendung mit einem größeren Worker erstellen und einen höheren Wert für konfigurieren spark.executor.memory or spark.driver.memory. Im folgenden Screenshot beträgt der Höchstwert für die Spitzenauslastung des JVM-Speichers beispielsweise 26 GB und spark.executor.memory auf 27 G eingestellt ist. In diesem Fall kann es vorteilhaft sein, größere Worker mit 60 GB Speicher zu verwenden und spark.executor.memory auf 54 G eingestellt.

Überlegungen

Obwohl große vCPUs dazu beitragen, die Lokalität der Shuffle-Blöcke zu erhöhen, spielen andere Faktoren wie Festplattendurchsatz, Festplatten-IOPS (Eingabe-/Ausgabeoperationen pro Sekunde) und Netzwerkbandbreite eine Rolle. In einigen Fällen könnten mehr kleine Worker mit mehr Festplatten im Vergleich zu weniger großen Workern insgesamt höhere Festplatten-IOPS, Durchsatz und Netzwerkbandbreite bieten. Wir empfehlen Ihnen, Ihre Workloads mit geeigneten vCPU-Konfigurationen zu vergleichen, um die beste Konfiguration für Ihre Workload auszuwählen.

Für Jobs mit vielen Zufallswiedergaben wird empfohlen, große Festplatten zu verwenden. Beim Erstellen Ihrer Anwendung können Sie jedem Worker bis zu 200 GB Festplatte hinzufügen. Verwenden großer vCPUs (spark.executor.cores) pro Executor kann die Festplattenauslastung auf jedem Worker erhöhen. Wenn Ihre Anwendung mit „No space left on device“ fehlschlägt, weil die Shuffle-Daten nicht auf die Festplatte passen, verwenden Sie kleinere Worker mit einer 200-GB-Festplatte.

Zusammenfassung

In diesem Beitrag haben Sie die Vorteile der Verwendung großer Executors für Ihre serverlosen EMR-Jobs kennengelernt. Weitere Informationen zu verschiedenen Worker-Konfigurationen finden Sie unter Worker-Konfigurationen. Große Worker-Konfigurationen sind in allen Regionen verfügbar, in denen EMR Serverless verfügbar ist verfügbar.


Über den Autor

Veena Vasudevan ist Senior Partner Solutions Architect und Amazon EMR-Spezialist bei AWS mit Schwerpunkt auf Big Data und Analysen. Sie hilft Kunden und Partnern beim Aufbau hochoptimierter, skalierbarer und sicherer Lösungen. modernisieren ihre Architekturen; und ihre Big-Data-Workloads zu AWS migrieren.

Zeitstempel:

Mehr von AWS Big Data