Amazon EMR Serverless understøtter større arbejderstørrelser for at køre mere computer- og hukommelsesintensive arbejdsbelastninger

Amazon EMR Serverless understøtter større arbejderstørrelser for at køre mere computer- og hukommelsesintensive arbejdsbelastninger

Kildeknude: 1960092

Amazon EMR-serverløs giver dig mulighed for at køre open source big data frameworks såsom Apache Spark og Apache Hive uden at administrere klynger og servere. Med EMR Serverless kan du køre analytiske arbejdsbelastninger i enhver skala med automatisk skalering, der ændrer størrelse på ressourcer på få sekunder for at imødekomme skiftende datamængder og behandlingskrav. EMR Serverless skalerer automatisk ressourcer op og ned for at give den helt rigtige mængde kapacitet til din applikation.

Vi er glade for at kunne meddele, at EMR Serverless nu tilbyder arbejderkonfigurationer på 8 vCPU'er med op til 60 GB hukommelse og 16 vCPU'er med op til 120 GB hukommelse, hvilket giver dig mulighed for at køre mere computer- og hukommelsesintensive arbejdsbelastninger på EMR Serverless. En EMR Serverless-applikation bruger internt arbejdere til at udføre arbejdsbelastninger. og du kan konfigurere forskellige medarbejderkonfigurationer baseret på dine arbejdsbelastningskrav. Tidligere var den største arbejderkonfiguration tilgængelig på EMR Serverless 4 vCPU'er med op til 30 GB hukommelse. Denne egenskab er især gavnlig for følgende almindelige scenarier:

  • Blandtunge arbejdsbyrder
  • Hukommelsesintensive arbejdsbelastninger

Lad os se på hver af disse use cases og fordelene ved at have større arbejderstørrelser.

Fordele ved at bruge store arbejdere til shuffle-intensive arbejdsbelastninger

I Spark and Hive forekommer shuffle, når data skal omfordeles på tværs af klyngen under en beregning. Når din applikation udfører brede transformationer eller reducerer operationer som f.eks join, groupBy, sortBy eller repartition, Spark and Hive udløser en shuffle. Hver Spark-stadie og Tez-vertex er desuden afgrænset af en shuffle-operation. Tager vi Spark som et eksempel, er der som standard 200 partitioner for hvert Spark-job defineret af spark.sql.shuffle.partitions. Spark vil dog beregne antallet af opgaver i farten baseret på datastørrelsen og den handling, der udføres. Når en bred transformation udføres oven på et stort datasæt, kan der være GB'er eller endda TB'er af data, der skal hentes af alle opgaverne.

Blandinger er typisk dyre i forhold til både tid og ressourcer og kan føre til flaskehalse i ydeevnen. Derfor kan optimering af shuffles have en betydelig indflydelse på ydeevnen og omkostningerne ved et Spark-job. Med store arbejdere kan flere data allokeres til hver eksekutørs hukommelse, hvilket minimerer de data, der blandes på tværs af eksekutører. Dette fører igen til øget shuffle-læseydelse, fordi flere data vil blive hentet lokalt fra den samme arbejder, og færre data vil blive hentet eksternt fra andre arbejdere.

Eksperimenter

For at demonstrere fordelene ved at bruge store arbejdere til shuffle-intensive forespørgsler, lad os bruge q78 fra TPC-DS, som er en shuffle-tung Spark-forespørgsel, der blander 167 GB data over 12 Spark-trin. Lad os udføre to iterationer af den samme forespørgsel med forskellige konfigurationer.

Konfigurationerne for test 1 er som følger:

  • Størrelse på executor anmodet under oprettelse af EMR Serverless-applikation = 4 vCPU'er, 8 GB hukommelse, 200 GB disk
  • Spark job konfiguration:
    • spark.executor.cores = 4
    • spark.executor.memory = 8
    • spark.executor.instances = 48
    • Parallelisme = 192 (spark.executor.instances * spark.executor.cores)

Konfigurationerne for test 2 er som følger:

  • Størrelse på executor anmodet under oprettelse af EMR Serverless-applikation = 8 vCPU'er, 16 GB hukommelse, 200 GB disk
  • Spark job konfiguration:
    • spark.executor.cores = 8
    • spark.executor.memory = 16
    • spark.executor.instances = 24
    • Parallelisme = 192 (spark.executor.instances * spark.executor.cores)

Lad os også deaktivere dynamisk tildeling ved at indstille spark.dynamicAllocation.enabled til false for begge tests for at undgå potentiel støj på grund af variable executor-lanceringstider og holde ressourceudnyttelsen ensartet for begge tests. Vi bruger Gnistmål, som er et open source-værktøj, der forenkler indsamling og analyse af Spark-performancemålinger. Fordi vi bruger et fast antal executors, er det samlede antal vCPU'er og hukommelse, der anmodes om, det samme for begge tests. Følgende tabel opsummerer observationerne fra de målinger, der er indsamlet med Spark Measure.

. Samlet tid taget for forespørgsel i millisekunder shuffleLocalBlocksHentet shuffleRemoteBlocksHentet shuffleLocalBytesRead shuffleRemoteBytesRead shuffleFetchWaitTime shuffleWriteTime
Test 1 153244 114175 5291825 3.5 DK 163.1 DK 1.9 hr 4.7 min
Test 2 108136 225448 5185552 6.9 DK 159.7 DK 3.2 min 5.2 min

Som det ses af tabellen, er der en betydelig forskel i ydeevne på grund af shuffle-forbedringer. Test 2, med halvdelen af ​​antallet af eksekvere, der er dobbelt så stort som Test 1, kørte 29.44 % hurtigere, med 1.97 gange flere shuffle-data hentet lokalt sammenlignet med Test 1 for den samme forespørgsel, samme parallelitet og samme samlede vCPU og hukommelsesressourcer . Derfor kan du drage fordel af forbedret ydeevne uden at gå på kompromis med omkostninger eller jobparallelitet ved hjælp af store udførende. Vi har observeret lignende ydeevnefordele for andre shuffle-intensive TPC-DS-forespørgsler som f.eks q23a , q23b.

Anbefalinger

For at afgøre, om de store arbejdere vil gavne dine shuffle-intensive Spark-applikationer, skal du overveje følgende:

  • Tjek Praktikophold fanen fra Spark History Server UI i din EMR Serverless-applikation. Fra det følgende skærmbillede af Spark History Server kan vi f.eks. fastslå, at dette Spark-job skrev og læste 167 GB shuffle-data samlet på tværs af 12 faser, idet vi ser på Bland Læs , Bland Skriv kolonner. Hvis dine job blander over 50 GB data, kan du muligvis drage fordel af at bruge større arbejdere med 8 eller 16 vCPU'er eller spark.executor.cores.

  • Tjek SQL / DataFrame fanen fra Spark History Server UI i din EMR Serverless-applikation (kun for Dataframe og Dataset API'er). Når du vælger den udførte Spark-handling, såsom collect, take, showString eller save, vil du se en aggregeret DAG for alle stadier adskilt af udvekslingerne. Hver udveksling i DAG'en svarer til en shuffle-operation, og den vil indeholde de lokale og eksterne bytes og blokke blandet, som det ses i det følgende skærmbillede. Hvis de lokale shuffle-blokke eller hentede bytes er meget færre sammenlignet med de hentede fjernblokke eller bytes, kan du køre din applikation igen med større arbejdere (med 8 eller 16 vCPU'er eller spark.executor.cores) og gennemgå disse udvekslingsmetrikker i en DAG for at se om der er nogen forbedring.

  • Brug Gnistmål værktøj med din Spark-forespørgsel for at få shuffle-metrics i Spark-driverens stdout logs, som vist i den følgende log for et Spark-job. Gennemgå den tid, det tager at blande læsninger (shuffleFetchWaitTime) og bland skriver (shuffleWriteTime), og forholdet mellem de hentede lokale bytes og de hentede fjernbytes. Hvis shuffle-operationen tager mere end 2 minutter, skal du køre din applikation igen med større arbejdere (med 8 eller 16 vCPU'er eller spark.executor.cores) med Spark Measure for at spore forbedringen i shuffle-ydeevne og den samlede jobkørselstid.
Time taken: 177647 ms Scheduling mode = FIFO
Spark Context default degree of parallelism = 192 Aggregated Spark stage metrics:
numStages => 22
numTasks => 10156
elapsedTime => 159894 (2.7 min)
stageDuration => 456893 (7.6 min)
executorRunTime => 28418517 (7.9 h)
executorCpuTime => 20276736 (5.6 h)
executorDeserializeTime => 326486 (5.4 min)
executorDeserializeCpuTime => 124323 (2.1 min)
resultSerializationTime => 534 (0.5 s)
jvmGCTime => 648809 (11 min)
shuffleFetchWaitTime => 340880 (5.7 min)
shuffleWriteTime => 245918 (4.1 min)
resultSize => 23199434 (22.1 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1794288453176
recordsRead => 18696929278
bytesRead => 77354154397 (72.0 GB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 14124240761
shuffleTotalBlocksFetched => 5571316
shuffleLocalBlocksFetched => 117321
shuffleRemoteBlocksFetched => 5453995
shuffleTotalBytesRead => 158582120627 (147.7 GB)
shuffleLocalBytesRead => 3337930126 (3.1 GB)
shuffleRemoteBytesRead => 155244190501 (144.6 GB)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 156913371886 (146.1 GB)
shuffleRecordsWritten => 13867102620

Fordele ved at bruge store arbejdere til hukommelsesintensive arbejdsbelastninger

Visse typer arbejdsbelastninger er hukommelsesintensive og kan drage fordel af mere hukommelse konfigureret pr. medarbejder. I dette afsnit diskuterer vi almindelige scenarier, hvor store arbejdere kan være gavnlige til at køre hukommelsesintensive arbejdsbelastninger.

Data skævhed

Dataskævheder forekommer almindeligvis i flere typer datasæt. Nogle almindelige eksempler er afsløring af svindel, befolkningsanalyse og indkomstfordeling. For eksempel, når du vil opdage uregelmæssigheder i dine data, forventes det, at kun mindre end 1 % af dataene er unormale. Hvis du ønsker at udføre en vis aggregering oven på normale vs. unormale poster, vil 99 % af dataene blive behandlet af en enkelt arbejder, hvilket kan føre til, at den pågældende arbejder løber tør for hukommelse. Dataskævheder kan observeres for hukommelsesintensive transformationer som f.eks groupBy, orderBy, join, vinduesfunktioner, collect_list, collect_set, og så videre. Sammenføj typer som f.eks BroadcastNestedLoopJoin og Cartesan-produkter er også i sagens natur hukommelsesintensive og modtagelige for dataskævheder. På samme måde, hvis dine inputdata er Gzip-komprimerede, kan en enkelt Gzip-fil ikke læses af mere end én opgave, fordi Gzip-komprimeringstypen ikke kan opdeles. Når der er et par meget store Gzip-filer i inputtet, kan dit job løbe tør for hukommelse, fordi en enkelt opgave måske skal læse en enorm Gzip-fil, der ikke passer i eksekveringshukommelsen.

Fejl på grund af dataskævhed kan afbødes ved at anvende strategier såsom saltning. Dette kræver dog ofte omfattende ændringer af koden, hvilket måske ikke er muligt for en produktionsbelastning, der fejlede på grund af en hidtil uset dataskævhed forårsaget af en pludselig stigning i indgående datamængde. For en enklere løsning vil du måske bare øge arbejderhukommelsen. Brug af større arbejdere med flere spark.executor.memory giver dig mulighed for at håndtere dataskævhed uden at foretage ændringer i din applikationskode.

Caching

For at forbedre ydeevnen giver Spark dig mulighed for at cache datarammer, datasæt og RDD'er i hukommelsen. Dette giver dig mulighed for at genbruge en dataramme flere gange i din applikation uden at skulle genberegne den. Som standard bruges op til 50 % af din executors JVM til at cache datarammerne baseret på property spark.memory.storageFraction. For eksempel, hvis din spark.executor.memory er indstillet til 30 GB, så bruges 15 GB til cachelagring, der er immun over for udsættelse.

Standardlagerniveauet for cachedrift er DISK_AND_MEMORY. Hvis størrelsen på den dataramme, du forsøger at cache, ikke passer i eksekverens hukommelse, spildes en del af cachen til disken. Hvis der ikke er plads nok til at skrive de cachelagrede data på disken, bliver blokkene smidt ud, og du får ikke fordelene ved at cache. Brug af større arbejdere giver dig mulighed for at cache flere data i hukommelsen, hvilket øger jobydeevnen ved at hente cachelagrede blokke fra hukommelsen i stedet for det underliggende lager.

Eksperimenter

For eksempel følgende PySpark job fører til en skævhed, hvor én eksekutør behandler 99.95 % af dataene med hukommelsesintensive aggregater som f.eks. collect_list. Jobbet cacherer også en meget stor dataramme (2.2 TB). Lad os køre to iterationer af det samme job på EMR Serverless med følgende vCPU og hukommelseskonfigurationer.

Lad os køre test 3 med de hidtil størst mulige arbejderkonfigurationer:

  • Størrelse på eksekveringssæt under oprettelse af EMR-serverløs applikation = 4 vCPU'er, 30 GB hukommelse, 200 GB disk
  • Spark job konfiguration:
    • spark.executor.cores = 4
    • spark.executor.memory = 27 G

Lad os køre Test 4 med de nyligt udgivne store arbejderkonfigurationer:

  • Størrelse på executor indstillet under oprettelse af EMR-serverløs applikation = 8 vCPU'er, 60 GB hukommelse, 200 GB disk
  • Spark job konfiguration:
    • spark.executor.cores = 8
    • spark.executor.memory = 54 G

Test 3 mislykkedes med FetchFailedException, hvilket resulterede på grund af at eksekveringshukommelsen ikke var tilstrækkelig til jobbet.

Fra Spark-brugergrænsefladen i Test 3 ser vi også, at eksekverernes reserverede lagerhukommelse blev fuldt udnyttet til at cache datarammerne.

De resterende blokke til cache blev spildt på disk, som det ses i eksekverens stderr logfiler:

23/02/06 16:06:58 INFO MemoryStore: Will not store rdd_4_1810
23/02/06 16:06:58 WARN MemoryStore: Not enough space to cache rdd_4_1810 in memory! (computed 134.1 MiB so far)
23/02/06 16:06:58 INFO MemoryStore: Memory use = 14.8 GiB (blocks) + 507.5 MiB (scratch space shared across 4 tasks(s)) = 15.3 GiB. Storage limit = 15.3 GiB.
23/02/06 16:06:58 WARN BlockManager: Persisting block rdd_4_1810 to disk instead.

Omkring 33 % af den vedvarende dataramme blev cachelagret på disken, som det ses på Opbevaring fanen i Spark UI.

Test 4 med større eksekvere og vCores kørte med succes uden at kaste nogen hukommelsesrelaterede fejl. Desuden blev kun omkring 2.2% af datarammen cachelagret på disken. Derfor vil cachede blokke af en dataramme blive hentet fra hukommelsen i stedet for fra disken, hvilket giver bedre ydeevne.

Anbefalinger

For at afgøre, om de store arbejdere vil gavne dine hukommelsesintensive Spark-applikationer, skal du overveje følgende:

  • Find ud af, om din Spark-applikation har nogen dataskævheder ved at se på Spark-brugergrænsefladen. Følgende skærmbillede af Spark-brugergrænsefladen viser et eksempel på dataskævscenario, hvor en opgave behandler det meste af dataene (145.2 GB) og ser på Bland Læs størrelse. Hvis en eller færre opgaver behandler væsentligt flere data end andre opgaver, skal du køre din applikation igen med større arbejdere med 60-120 G hukommelse (spark.executor.memory indstillet alt fra 54-109 GB med 10 % af spark.executor.memoryOverhead).

  • Tjek Opbevaring fanen på Spark History Server for at gennemgå forholdet mellem cachelagrede data i hukommelsen og disken fra Størrelse i hukommelsen , Størrelse i disk kolonner. Hvis mere end 10 % af dine data er cachelagret på disken, skal du køre dit program igen med større arbejdere for at øge mængden af ​​data, der er gemt i hukommelsen.
  • En anden måde at forebyggende afgøre, om dit job har brug for mere hukommelse, er ved at overvåge Peak JVM-hukommelse på Spark UI Eksekutører fanen. Hvis den anvendte peak JVM-hukommelse er tæt på executor- eller driverhukommelsen, kan du oprette en applikation med en større arbejder og konfigurere en højere værdi for spark.executor.memory or spark.driver.memory. For eksempel, i det følgende skærmbillede er den maksimale værdi af peak JVM-hukommelsesbrug 26 GB og spark.executor.memory er indstillet til 27 G. I dette tilfælde kan det være en fordel at bruge større arbejdere med 60 GB hukommelse og spark.executor.memory indstillet til 54 G.

Overvejelser

Selvom store vCPU'er hjælper med at øge placeringen af ​​shuffle-blokkene, er der andre faktorer involveret, såsom diskgennemstrømning, disk IOPS (input/output operationer per sekund) og netværksbåndbredde. I nogle tilfælde kunne flere små arbejdere med flere diske tilbyde højere disk IOPS, gennemløb og netværksbåndbredde samlet set sammenlignet med færre store arbejdere. Vi opfordrer dig til at benchmarke dine arbejdsbelastninger mod passende vCPU-konfigurationer for at vælge den bedste konfiguration til din arbejdsbyrde.

Til shuffle-tunge job anbefales det at bruge store diske. Du kan vedhæfte op til 200 GB disk til hver medarbejder, når du opretter din applikation. Brug af store vCPU'er (spark.executor.cores) pr. executor kan øge diskudnyttelsen på hver arbejder. Hvis din applikation fejler med "Ingen plads tilbage på enheden" på grund af manglende evne til at passe shuffle-data på disken, skal du bruge flere mindre arbejdere med 200 GB disk.

Konklusion

I dette indlæg lærte du om fordelene ved at bruge store eksekutører til dine EMR-serverløse job. For mere information om forskellige arbejderkonfigurationer, se Arbejderkonfigurationer. Store arbejderkonfigurationer er tilgængelige i alle regioner, hvor EMR Serverless er til rådighed.


Om forfatteren

Veena Vasudevan er Senior Partner Solutions Architect og Amazon EMR-specialist hos AWS med fokus på big data og analytics. Hun hjælper kunder og partnere med at bygge meget optimerede, skalerbare og sikre løsninger; modernisere deres arkitekturer; og migrere deres big data-arbejdsbelastninger til AWS.

Tidsstempel:

Mere fra AWS Big Data