Amazon EMR Serverless supporta worker di dimensioni maggiori per eseguire più carichi di lavoro a uso intensivo di memoria e di calcolo

Amazon EMR Serverless supporta worker di dimensioni maggiori per eseguire più carichi di lavoro a uso intensivo di memoria e di calcolo

Nodo di origine: 1960092

Amazon EMR senza server consente di eseguire framework di big data open source come Apache Spark e Apache Hive senza gestire cluster e server. Con EMR Serverless, puoi eseguire carichi di lavoro di analisi su qualsiasi scala con scalabilità automatica che ridimensiona le risorse in pochi secondi per soddisfare i cambiamenti dei volumi di dati e dei requisiti di elaborazione. EMR Serverless ridimensiona automaticamente le risorse su e giù per fornire la giusta quantità di capacità per la tua applicazione.

Siamo lieti di annunciare che EMR Serverless ora offre configurazioni worker di 8 vCPU con un massimo di 60 GB di memoria e 16 vCPU con un massimo di 120 GB di memoria, consentendo di eseguire più carichi di lavoro di calcolo e ad alta intensità di memoria su EMR Serverless. Un'applicazione EMR Serverless utilizza internamente i worker per eseguire i carichi di lavoro. ed è possibile configurare diverse configurazioni dei lavoratori in base ai requisiti del carico di lavoro. In precedenza, la più grande configurazione di lavoro disponibile su EMR Serverless era di 4 vCPU con un massimo di 30 GB di memoria. Questa funzionalità è particolarmente vantaggiosa per i seguenti scenari comuni:

  • Carichi di lavoro pesanti
  • Carichi di lavoro a uso intensivo di memoria

Diamo un'occhiata a ciascuno di questi casi d'uso e ai vantaggi di avere lavoratori di dimensioni maggiori.

Vantaggi dell'utilizzo di grandi worker per carichi di lavoro ad alta intensità di shuffle

In Spark e Hive, lo shuffle si verifica quando i dati devono essere ridistribuiti nel cluster durante un calcolo. Quando l'applicazione esegue ampie trasformazioni o riduce operazioni come join, groupBy, sortBy, o repartition, Spark e Hive innescano uno shuffle. Inoltre, ogni fase Spark e vertice Tez è delimitata da un'operazione di riordino. Prendendo Spark come esempio, per impostazione predefinita, ci sono 200 partizioni per ogni lavoro Spark definito da spark.sql.shuffle.partitions. Tuttavia, Spark calcolerà al volo il numero di attività in base alla dimensione dei dati e all'operazione eseguita. Quando viene eseguita un'ampia trasformazione su un set di dati di grandi dimensioni, potrebbero esserci GB o addirittura TB di dati che devono essere recuperati da tutte le attività.

Gli shuffle sono in genere costosi in termini di tempo e risorse e possono portare a colli di bottiglia delle prestazioni. Pertanto, l'ottimizzazione degli shuffle può avere un impatto significativo sulle prestazioni e sul costo di un job Spark. Con i lavoratori di grandi dimensioni, è possibile allocare più dati alla memoria di ciascun esecutore, il che riduce al minimo i dati scambiati tra gli esecutori. Questo, a sua volta, porta a un aumento delle prestazioni di lettura casuale perché più dati verranno recuperati localmente dallo stesso lavoratore e meno dati verranno recuperati in remoto da altri lavoratori.

Esperimenti

Per dimostrare i vantaggi dell'utilizzo di grandi worker per query a uso intensivo di shuffle, usiamo q78 da TPC-DS, che è una query Spark pesantemente casuale che mescola 167 GB di dati su 12 fasi Spark. Eseguiamo due iterazioni della stessa query con configurazioni diverse.

Le configurazioni per il Test 1 sono le seguenti:

  • Dimensione dell'esecutore richiesta durante la creazione dell'applicazione EMR Serverless = 4 vCPU, 8 GB di memoria, 200 GB di disco
  • Configurazione processo Spark:
    • spark.executor.cores = 4
    • spark.executor.memory = 8
    • spark.executor.instances = 48
    • Parallelismo = 192 (spark.executor.instances * spark.executor.cores)

Le configurazioni per il Test 2 sono le seguenti:

  • Dimensione dell'esecutore richiesta durante la creazione dell'applicazione EMR Serverless = 8 vCPU, 16 GB di memoria, 200 GB di disco
  • Configurazione processo Spark:
    • spark.executor.cores = 8
    • spark.executor.memory = 16
    • spark.executor.instances = 24
    • Parallelismo = 192 (spark.executor.instances * spark.executor.cores)

Disabilitiamo anche l'allocazione dinamica impostando spark.dynamicAllocation.enabled a false per entrambi i test per evitare qualsiasi potenziale rumore dovuto a tempi di avvio variabili dell'esecutore e mantenere coerente l'utilizzo delle risorse per entrambi i test. Noi usiamo Misura scintilla, che è uno strumento open source che semplifica la raccolta e l'analisi delle metriche delle prestazioni di Spark. Poiché utilizziamo un numero fisso di esecutori, il numero totale di vCPU e memoria richieste è lo stesso per entrambi i test. La tabella seguente riepiloga le osservazioni delle metriche raccolte con Spark Measure.

. Tempo totale impiegato per la query in millisecondi shuffleLocalBlocksFetched shuffleRemoteBlocksFetched shuffleLocalBytesRead shuffleRemoteBytesRead shuffleFetchWaitTime shuffleWriteTime
Test 1 153244 114175 5291825 3.5 GB 163.1 GB 1.9 4.7 min
Test 2 108136 225448 5185552 6.9 GB 159.7 GB 3.2 min 5.2 min

Come si vede dalla tabella, c'è una differenza significativa nelle prestazioni dovuta ai miglioramenti dello shuffle. Il test 2, con la metà del numero di esecutori che è due volte più grande del test 1, è stato eseguito il 29.44% più velocemente, con 1.97 volte più dati shuffle recuperati localmente rispetto al test 1 per la stessa query, lo stesso parallelismo e le stesse risorse di memoria e vCPU aggregate . Pertanto, puoi beneficiare di prestazioni migliorate senza compromettere i costi o il parallelismo dei lavori con l'aiuto di esecutori di grandi dimensioni. Abbiamo osservato vantaggi in termini di prestazioni simili per altre query TPC-DS a uso intensivo di shuffle, ad esempio q23a ed q23b.

raccomandazioni

Per determinare se i lavoratori di grandi dimensioni andranno a vantaggio delle tue applicazioni Spark ad alta intensità di shuffle, considera quanto segue:

  • Controlla il stage dall'interfaccia utente di Spark History Server dell'applicazione EMR Serverless. Ad esempio, dallo screenshot seguente di Spark History Server, possiamo determinare che questo processo Spark ha scritto e letto 167 GB di dati shuffle aggregati in 12 fasi, osservando il Lettura casuale ed Scrivi in ​​modo casuale colonne. Se i tuoi lavori riordinano oltre 50 GB di dati, potresti potenzialmente trarre vantaggio dall'utilizzo di worker più grandi con 8 o 16 vCPU o spark.executor.cores.

  • Controlla il SQL/Frame dati dall'interfaccia utente di Spark History Server dell'applicazione EMR Serverless (solo per le API Dataframe e Dataset). Quando scegli l'azione Spark eseguita, ad esempio collect, take, showString o save, vedrai un DAG aggregato per tutte le fasi separate dagli scambi. Ogni scambio nel DAG corrisponde a un'operazione di shuffle e conterrà i byte e i blocchi locali e remoti rimescolati, come mostrato nello screenshot seguente. Se i blocchi o i byte shuffle locali recuperati sono molto inferiori rispetto ai blocchi o ai byte remoti recuperati, è possibile eseguire nuovamente l'applicazione con worker più grandi (con 8 o 16 vCPU o spark.executor.cores) ed esaminare queste metriche di scambio in un DAG per vedere se c'è qualche miglioramento.

  • Usa il Misura scintilla tool con la tua query Spark per ottenere le metriche di shuffle nei driver Spark stdout logs, come mostrato nel log seguente per un processo Spark. Esaminare il tempo impiegato per le letture casuali (shuffleFetchWaitTime) e shuffle scrive (shuffleWriteTime) e il rapporto tra i byte locali recuperati e i byte remoti recuperati. Se l'operazione di shuffle richiede più di 2 minuti, eseguire nuovamente l'applicazione con worker più grandi (con 8 o 16 vCPU o spark.executor.cores) con Spark Measure per tenere traccia del miglioramento delle prestazioni di riproduzione casuale e del tempo di esecuzione complessivo del lavoro.
Time taken: 177647 ms Scheduling mode = FIFO
Spark Context default degree of parallelism = 192 Aggregated Spark stage metrics:
numStages => 22
numTasks => 10156
elapsedTime => 159894 (2.7 min)
stageDuration => 456893 (7.6 min)
executorRunTime => 28418517 (7.9 h)
executorCpuTime => 20276736 (5.6 h)
executorDeserializeTime => 326486 (5.4 min)
executorDeserializeCpuTime => 124323 (2.1 min)
resultSerializationTime => 534 (0.5 s)
jvmGCTime => 648809 (11 min)
shuffleFetchWaitTime => 340880 (5.7 min)
shuffleWriteTime => 245918 (4.1 min)
resultSize => 23199434 (22.1 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1794288453176
recordsRead => 18696929278
bytesRead => 77354154397 (72.0 GB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 14124240761
shuffleTotalBlocksFetched => 5571316
shuffleLocalBlocksFetched => 117321
shuffleRemoteBlocksFetched => 5453995
shuffleTotalBytesRead => 158582120627 (147.7 GB)
shuffleLocalBytesRead => 3337930126 (3.1 GB)
shuffleRemoteBytesRead => 155244190501 (144.6 GB)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 156913371886 (146.1 GB)
shuffleRecordsWritten => 13867102620

Vantaggi dell'utilizzo di grandi worker per carichi di lavoro a uso intensivo di memoria

Alcuni tipi di carichi di lavoro richiedono molta memoria e possono trarre vantaggio da più memoria configurata per lavoratore. In questa sezione, vengono illustrati scenari comuni in cui i lavoratori di grandi dimensioni potrebbero essere utili per l'esecuzione di carichi di lavoro a uso intensivo di memoria.

Disallineamento dei dati

Le asimmetrie dei dati si verificano comunemente in diversi tipi di set di dati. Alcuni esempi comuni sono il rilevamento delle frodi, l'analisi della popolazione e la distribuzione del reddito. Ad esempio, quando desideri rilevare anomalie nei tuoi dati, è previsto che solo meno dell'1% dei dati sia anomalo. Se si desidera eseguire un'aggregazione oltre ai record normali e anomali, il 99% dei dati verrà elaborato da un singolo lavoratore, il che potrebbe comportare l'esaurimento della memoria del lavoratore. È possibile osservare distorsioni dei dati per trasformazioni ad alta intensità di memoria come groupBy, orderBy, join, funzioni finestra, collect_list, collect_set, e così via. Unisciti a tipi come BroadcastNestedLoopJoin e il prodotto cartesano sono anche intrinsecamente ad alta intensità di memoria e suscettibili a distorsioni dei dati. Allo stesso modo, se i tuoi dati di input sono compressi con Gzip, un singolo file Gzip non può essere letto da più di un'attività perché il tipo di compressione Gzip non è divisibile. Quando ci sono alcuni file Gzip molto grandi nell'input, il tuo lavoro potrebbe esaurire la memoria perché una singola attività potrebbe dover leggere un enorme file Gzip che non si adatta alla memoria dell'esecutore.

I guasti dovuti alla distorsione dei dati possono essere mitigati applicando strategie come il salting. Tuttavia, ciò richiede spesso modifiche estese al codice, che potrebbero non essere fattibili per un carico di lavoro di produzione che ha avuto esito negativo a causa di una distorsione dei dati senza precedenti causata da un improvviso aumento del volume di dati in entrata. Per una soluzione più semplice, potresti semplicemente voler aumentare la memoria del lavoratore. Utilizzo di lavoratori più grandi con più spark.executor.memory consente di gestire la distorsione dei dati senza apportare modifiche al codice dell'applicazione.

Caching

Per migliorare le prestazioni, Spark consente di memorizzare nella cache i frame di dati, i set di dati e gli RDD. Ciò consente di riutilizzare più volte un frame di dati nell'applicazione senza doverlo ricalcolare. Per impostazione predefinita, fino al 50% della JVM dell'esecutore viene utilizzato per memorizzare nella cache i frame di dati basati su property spark.memory.storageFraction. Ad esempio, se il tuo spark.executor.memory è impostato su 30 GB, quindi 15 GB vengono utilizzati per l'archiviazione della cache che è immune all'eliminazione.

Il livello di archiviazione predefinito dell'operazione della cache è DISK_AND_MEMORY. Se la dimensione del frame di dati che stai tentando di memorizzare nella cache non rientra nella memoria dell'esecutore, una parte della cache viene trasferita sul disco. Se non c'è spazio sufficiente per scrivere i dati memorizzati nella cache nel disco, i blocchi vengono rimossi e non si ottengono i vantaggi della memorizzazione nella cache. L'utilizzo di worker più grandi consente di memorizzare nella cache più dati in memoria, migliorando le prestazioni del lavoro recuperando i blocchi memorizzati nella cache dalla memoria anziché dall'archiviazione sottostante.

Esperimenti

Ad esempio, il seguente Lavoro PySpark porta a un'inclinazione, con un esecutore che elabora il 99.95% dei dati con aggregati ad alta intensità di memoria come collect_list. Il lavoro memorizza anche nella cache un frame di dati molto grande (2.2 TB). Eseguiamo due iterazioni dello stesso processo su EMR Serverless con le seguenti configurazioni di vCPU e memoria.

Eseguiamo il test 3 con le configurazioni di lavoro più grandi possibili in precedenza:

  • Dimensione dell'esecutore impostato durante la creazione dell'applicazione EMR Serverless = 4 vCPU, 30 GB di memoria, 200 GB di disco
  • Configurazione processo Spark:
    • spark.executor.cores = 4
    • spark.executor.memory = 27 g

Eseguiamo il test 4 con le configurazioni di lavoro di grandi dimensioni appena rilasciate:

  • Dimensione dell'esecutore impostata durante la creazione dell'applicazione EMR Serverless = 8 vCPU, 60 GB di memoria, 200 GB di disco
  • Configurazione processo Spark:
    • spark.executor.cores = 8
    • spark.executor.memory = 54 g

Test 3 fallito con FetchFailedException, risultante dal fatto che la memoria dell'esecutore non era sufficiente per il lavoro.

Inoltre, dall'interfaccia utente Spark del test 3, vediamo che la memoria di archiviazione riservata degli esecutori è stata completamente utilizzata per la memorizzazione nella cache dei frame di dati.

I blocchi rimanenti nella cache sono stati riversati su disco, come si vede nell'esecutore stderr log:

23/02/06 16:06:58 INFO MemoryStore: Will not store rdd_4_1810
23/02/06 16:06:58 WARN MemoryStore: Not enough space to cache rdd_4_1810 in memory! (computed 134.1 MiB so far)
23/02/06 16:06:58 INFO MemoryStore: Memory use = 14.8 GiB (blocks) + 507.5 MiB (scratch space shared across 4 tasks(s)) = 15.3 GiB. Storage limit = 15.3 GiB.
23/02/06 16:06:58 WARN BlockManager: Persisting block rdd_4_1810 to disk instead.

Circa il 33% del frame di dati persistente è stato memorizzato nella cache su disco, come si vede sul file Archiviazione scheda dell'interfaccia utente di Spark.

Il test 4 con esecutori e vCore più grandi è stato eseguito correttamente senza generare errori relativi alla memoria. Inoltre, solo il 2.2% circa del frame di dati è stato memorizzato nella cache su disco. Pertanto, i blocchi memorizzati nella cache di un frame di dati verranno recuperati dalla memoria anziché dal disco, offrendo prestazioni migliori.

raccomandazioni

Per determinare se i lavoratori di grandi dimensioni andranno a vantaggio delle tue applicazioni Spark a uso intensivo di memoria, considera quanto segue:

  • Determina se la tua applicazione Spark presenta distorsioni di dati osservando l'interfaccia utente di Spark. Lo screenshot seguente dell'interfaccia utente di Spark mostra uno scenario di asimmetria dei dati di esempio in cui un'attività elabora la maggior parte dei dati (145.2 GB), osservando il Lettura casuale misurare. Se una o meno attività elaborano una quantità di dati significativamente maggiore rispetto ad altre attività, eseguire nuovamente l'applicazione con worker più grandi con 60-120 G di memoria (spark.executor.memory impostare ovunque da 54 a 109 GB factoring nel 10% di spark.executor.memoryOverhead).

  • Controlla il Archiviazione scheda di Spark History Server per esaminare il rapporto tra i dati memorizzati nella cache in memoria e il disco dal file Dimensioni in memoria ed Dimensione in disco colonne. Se più del 10% dei dati viene memorizzato nella cache su disco, eseguire nuovamente l'applicazione con worker più grandi per aumentare la quantità di dati memorizzati nella cache.
  • Un altro modo per determinare preventivamente se il tuo lavoro ha bisogno di più memoria è il monitoraggio Picco di memoria JVM sull'interfaccia utente di Spark esecutori scheda. Se la memoria JVM di picco utilizzata è vicina alla memoria dell'executor o del driver, è possibile creare un'applicazione con un worker più grande e configurare un valore più elevato per spark.executor.memory or spark.driver.memory. Ad esempio, nello screenshot seguente, il valore massimo del picco di utilizzo della memoria JVM è 26 GB e spark.executor.memory è impostato su 27 G. In questo caso, potrebbe essere vantaggioso utilizzare worker più grandi con 60 GB di memoria e spark.executor.memory impostato su 54 G.

Considerazioni

Sebbene le vCPU di grandi dimensioni aiutino ad aumentare la località dei blocchi di shuffle, sono coinvolti altri fattori come il throughput del disco, gli IOPS del disco (operazioni di input/output al secondo) e la larghezza di banda della rete. In alcuni casi, più piccoli worker con più dischi potrebbero offrire IOPS su disco, velocità effettiva e larghezza di banda di rete superiori in generale rispetto a meno worker di grandi dimensioni. Ti invitiamo a confrontare i tuoi carichi di lavoro con le configurazioni vCPU idonee per scegliere la configurazione migliore per il tuo carico di lavoro.

Per i lavori più pesanti, si consiglia di utilizzare dischi di grandi dimensioni. Puoi collegare fino a 200 GB di disco a ciascun nodo di lavoro quando crei la tua applicazione. Utilizzo di vCPU di grandi dimensioni (spark.executor.cores) per esecutore può aumentare l'utilizzo del disco su ciascun lavoratore. Se la tua applicazione ha esito negativo con "Nessuno spazio rimasto sul dispositivo" a causa dell'impossibilità di inserire i dati shuffle nel disco, utilizza lavoratori più piccoli con un disco da 200 GB.

Conclusione

In questo post, hai appreso i vantaggi dell'utilizzo di esecutori di grandi dimensioni per i tuoi lavori serverless EMR. Per ulteriori informazioni sulle diverse configurazioni dei lavoratori, fare riferimento a Configurazioni dei lavoratori. Le configurazioni di lavoro di grandi dimensioni sono disponibili in tutte le regioni in cui si trova EMR Serverless disponibile.


L'autore

Veena Vasudevan è un Senior Partner Solutions Architect e uno specialista di Amazon EMR presso AWS, specializzato in big data e analisi. Aiuta clienti e partner a creare soluzioni altamente ottimizzate, scalabili e sicure; modernizzare le loro architetture; e migrare i loro carichi di lavoro di big data su AWS.

Timestamp:

Di più da Big Data di AWS