Az Amazon EMR Serverless támogatja a nagyobb dolgozói méreteket, hogy több számítási és memóriaigényes munkaterhelést tudjon futtatni

Az Amazon EMR Serverless támogatja a nagyobb dolgozói méreteket, hogy több számítási és memóriaigényes munkaterhelést tudjon futtatni

Forrás csomópont: 1960092

Amazon EMR szerver nélküli lehetővé teszi a nyílt forráskódú big data keretrendszerek, például az Apache Spark és az Apache Hive futtatását fürtök és kiszolgálók kezelése nélkül. Az EMR Serverless segítségével bármilyen léptékben futtathatja az analitikai munkaterheléseket az automatikus skálázással, amely másodpercek alatt átméretezi az erőforrásokat, hogy megfeleljen a változó adatmennyiségeknek és feldolgozási követelményeknek. Az EMR Serverless automatikusan felfelé és lefelé méretezi az erőforrásokat, hogy az alkalmazásnak megfelelő kapacitást biztosítson.

Örömmel jelentjük be, hogy az EMR Serverless immár 8 vCPU-ból álló dolgozói konfigurációkat kínál akár 60 GB memóriával és 16 vCPU-t akár 120 GB memóriával, így több számítási és memóriaigényes munkaterhelést futtathat az EMR Serverless rendszeren. Az EMR Serverless alkalmazás belsőleg dolgozókat használ a munkaterhelések végrehajtásához. és a munkaterhelési követelményei alapján különböző dolgozói konfigurációkat konfigurálhat. Korábban az EMR Serverlessen elérhető legnagyobb dolgozói konfiguráció 4 vCPU volt, akár 30 GB memóriával. Ez a képesség különösen előnyös a következő gyakori forgatókönyvek esetén:

  • Nehéz munkaterhelések keverése
  • Memóriaigényes munkaterhelések

Nézzük meg ezeket a használati eseteket és a nagyobb dolgozói létszám előnyeit.

Előnyök, ha nagy létszámú munkásokat alkalmaznak az intenzív munkaterhelésekhez

A Spark and Hive alkalmazásban a keverés akkor történik, amikor az adatokat újra kell osztani a fürtben a számítás során. Amikor az alkalmazás széles körű átalakításokat hajt végre, vagy csökkenti a műveleteket, mint pl join, groupBy, sortByvagy repartition, A Spark and Hive keverést indít el. Ezenkívül minden Spark szakaszt és Tez csúcsot egy keverési művelet határol. A Sparkot példaként véve alapértelmezés szerint 200 partíció van minden által meghatározott Spark-feladathoz spark.sql.shuffle.partitions. A Spark azonban menet közben kiszámolja a feladatok számát az adatméret és a végrehajtott művelet alapján. Ha egy nagy adatkészleten széles körű átalakítást hajtanak végre, előfordulhat, hogy GB-nyi vagy akár TB-nyi adatot kell lekérnie az összes feladatnak.

A keverés általában költséges mind az idő, mind az erőforrások tekintetében, és teljesítménybeli szűk keresztmetszetek kialakulásához vezethet. Ezért a keverés optimalizálása jelentős hatással lehet egy Spark-feladat teljesítményére és költségére. Nagy munkások esetén több adatot lehet lefoglalni az egyes végrehajtók memóriájába, ami minimalizálja a végrehajtók között kevert adatok számát. Ez viszont megnövekedett véletlenszerű olvasási teljesítményhez vezet, mivel a rendszer több adatot kér le helyben ugyanattól a dolgozótól, és kevesebb adatot fog lekérni távolról a többi dolgozótól.

kísérletek

Hogy bemutassuk, milyen előnyökkel jár a nagy létszámú dolgozók használata intenzív kevert lekérdezésekhez, használjuk q78 a TPC-DS-től, amely egy sok keverést igénylő Spark-lekérdezés, amely 167 GB adatot kever meg 12 Spark-fokozaton keresztül. Végezzünk el két iterációt ugyanannak a lekérdezésnek, különböző konfigurációkkal.

Az 1. teszt konfigurációi a következők:

  • Az EMR szerver nélküli alkalmazás létrehozásakor kért végrehajtó mérete = 4 vCPU, 8 GB memória, 200 GB lemez
  • Spark job konfiguráció:
    • spark.executor.cores = 4
    • spark.executor.memory = 8
    • spark.executor.instances = 48
    • Párhuzam = 192 (spark.executor.instances * spark.executor.cores)

Az 2. teszt konfigurációi a következők:

  • Az EMR szerver nélküli alkalmazás létrehozásakor kért végrehajtó mérete = 8 vCPU, 16 GB memória, 200 GB lemez
  • Spark job konfiguráció:
    • spark.executor.cores = 8
    • spark.executor.memory = 16
    • spark.executor.instances = 24
    • Párhuzam = 192 (spark.executor.instances * spark.executor.cores)

A dinamikus kiosztást is tiltsuk beállítással spark.dynamicAllocation.enabled nak nek false mindkét teszthez, hogy elkerülje a változó végrehajtó indítási idők miatti esetleges zajt, és konzisztens legyen az erőforrás-kihasználás mindkét tesztnél. Használjuk Spark Measure, amely egy nyílt forráskódú eszköz, amely leegyszerűsíti a Spark teljesítménymutatóinak gyűjtését és elemzését. Mivel fix számú végrehajtót használunk, a vCPU-k és a kért memória teljes száma mindkét tesztnél azonos. A következő táblázat összefoglalja a Spark Measure segítségével gyűjtött metrikák megfigyeléseit.

. A lekérdezéshez eltöltött teljes idő ezredmásodpercben shuffleLocalBlocksFetched shuffleRemoteBlocksFetched shuffleLocalBytesRead shuffleRemoteBytesRead shuffleFetchWaitTime shuffleWriteTime
Teszt 1 153244 114175 5291825 3.5 GB 163.1 GB 1.9 hr 4.7 min
Teszt 2 108136 225448 5185552 6.9 GB 159.7 GB 3.2 min 5.2 min

Amint a táblázatból látható, jelentős különbségek vannak a teljesítményben a keverési fejlesztések miatt. A 2. teszt, feleannyi végrehajtóval, amelyek kétszer akkoraak, mint az 1. teszt, 29.44%-kal gyorsabban futott, és 1.97-szer több keverési adatot vettek le helyben az 1. teszthez képest ugyanazon lekérdezés, azonos párhuzamosság, valamint azonos összesített vCPU és memória erőforrások esetén. . Ezért profitálhat a jobb teljesítményből anélkül, hogy kompromisszumot kötne a költségekben vagy a munka párhuzamosságában a nagy végrehajtók segítségével. Hasonló teljesítményelőnyöket figyeltünk meg más, keverést igénylő TPC-DS lekérdezéseknél is, mint pl q23a és a q23b.

ajánlások

Annak megállapításához, hogy a nagy munkások hasznot húznak-e az intenzív keverést igénylő Spark-alkalmazásaiból, vegye figyelembe a következőket:

  • Ellenőrizd a Szakmai lapon az EMR Serverless alkalmazás Spark History Server felhasználói felületén. Például a Spark History Server alábbi képernyőképe alapján megállapíthatjuk, hogy ez a Spark-feladat 167 GB véletlenszerű adatot írt és olvasott el 12 szakaszon keresztül, a Véletlenszerű olvasás és a Véletlenszerű írás oszlopok. Ha a munkái több mint 50 GB adatot kevernek, akkor előnyös lehet a nagyobb dolgozók alkalmazása 8 vagy 16 vCPU-val vagy spark.executor.cores.

  • Ellenőrizd a SQL / DataFrame fület az EMR Serverless alkalmazás Spark History Server felhasználói felületéről (csak Dataframe és Dataset API-k esetén). Amikor kiválasztja a végrehajtott Spark-műveletet, mint például a gyűjtés, az átvétel, a karakterlánc megjelenítése vagy a mentés, egy összesített DAG-t fog látni a cserék által elválasztott összes szakaszhoz. A DAG-ban minden csere egy keverési műveletnek felel meg, és tartalmazza a helyi és távoli bájtokat és blokkokat megkeverve, ahogy az a következő képernyőképen látható. Ha a lekért helyi keverési blokkok vagy bájtok mennyisége sokkal kevesebb a távoli blokkokhoz vagy bájtokhoz képest, akkor újrafuttathatja az alkalmazást nagyobb dolgozókkal (8 vagy 16 vCPU-val vagy spark.executor.cores), és áttekintheti ezeket a cseremetrikákat egy DAG-ban. hátha van valami javulás.

  • Használja a Spark Measure eszközt a Spark-lekérdezéssel, hogy megkapja a keverési mérőszámokat a Spark illesztőprogramjában stdout naplók, amint az a következő naplóban látható egy Spark-feladathoz. Tekintse át a véletlenszerű olvasáshoz szükséges időt (shuffleFetchWaitTime) és a shuffle írja (shuffleWriteTime), valamint a lekért helyi bájtok aránya a lekért távoli bájtokhoz képest. Ha a keverési művelet 2 percnél tovább tart, futtassa újra az alkalmazást nagyobb dolgozókkal (8 vagy 16 vCPU-val vagy spark.executor.cores) a Spark Measure segítségével a véletlen sorrendű lejátszási teljesítmény és a teljes munkaidő javulásának nyomon követéséhez.
Time taken: 177647 ms Scheduling mode = FIFO
Spark Context default degree of parallelism = 192 Aggregated Spark stage metrics:
numStages => 22
numTasks => 10156
elapsedTime => 159894 (2.7 min)
stageDuration => 456893 (7.6 min)
executorRunTime => 28418517 (7.9 h)
executorCpuTime => 20276736 (5.6 h)
executorDeserializeTime => 326486 (5.4 min)
executorDeserializeCpuTime => 124323 (2.1 min)
resultSerializationTime => 534 (0.5 s)
jvmGCTime => 648809 (11 min)
shuffleFetchWaitTime => 340880 (5.7 min)
shuffleWriteTime => 245918 (4.1 min)
resultSize => 23199434 (22.1 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1794288453176
recordsRead => 18696929278
bytesRead => 77354154397 (72.0 GB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 14124240761
shuffleTotalBlocksFetched => 5571316
shuffleLocalBlocksFetched => 117321
shuffleRemoteBlocksFetched => 5453995
shuffleTotalBytesRead => 158582120627 (147.7 GB)
shuffleLocalBytesRead => 3337930126 (3.1 GB)
shuffleRemoteBytesRead => 155244190501 (144.6 GB)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 156913371886 (146.1 GB)
shuffleRecordsWritten => 13867102620

A nagy létszámú munkások használatának előnyei nagy memóriaigényes munkaterhelések esetén

Bizonyos típusú munkaterhelések memóriaigényesek, és előnyös lehet a dolgozónként konfigurált több memória. Ebben a részben olyan gyakori forgatókönyveket tárgyalunk, amelyekben a nagy dolgozók hasznosak lehetnek a memóriaigényes munkaterhelések futtatásához.

Adatok torzítása

Az adattorzulás általában többféle adatkészletben fordul elő. Néhány gyakori példa a csalások felderítése, a népességelemzés és a jövedelemelosztás. Ha például anomáliákat szeretne észlelni az adatokban, akkor várhatóan csak az adatok kevesebb, mint 1%-a abnormális. Ha valamilyen összesítést szeretne végrehajtani a normál vagy abnormális rekordokon felül, az adatok 99%-át egyetlen dolgozó dolgozza fel, ami ahhoz vezethet, hogy a dolgozónak elfogy a memóriája. Adattorzulás figyelhető meg az olyan memóriaigényes átalakításoknál, mint pl groupBy, orderBy, join, ablak funkciók, collect_list, collect_set, stb. Csatlakozási típusok, mint pl BroadcastNestedLoopJoin és a Cartesan termék is eleve memóriaigényes és érzékeny az adatok torzítására. Hasonlóképpen, ha a bemeneti adatok Gzip-tömörítésűek, egyetlen Gzip-fájlt egynél több feladat nem tud elolvasni, mivel a Gzip-tömörítési típus feloszthatatlan. Ha néhány nagyon nagy Gzip fájl van a bemenetben, előfordulhat, hogy a feladatnak elfogy a memóriája, mert előfordulhat, hogy egyetlen feladatnak egy hatalmas Gzip fájlt kell beolvasnia, amely nem fér el a végrehajtó memóriájában.

Az adatok torzulásából eredő hibák enyhíthetők olyan stratégiák alkalmazásával, mint például a sózás. Ez azonban gyakran jelentős módosításokat tesz szükségessé a kódban, ami nem biztos, hogy megvalósítható olyan termelési munkaterhelés esetén, amely a bejövő adatmennyiség hirtelen megugrása által okozott soha nem látott adattorzulás miatt meghiúsult. Az egyszerűbb megoldás érdekében érdemes lehet növelni a dolgozói memóriát. Nagyobb dolgozók használata többel spark.executor.memory lehetővé teszi az adatok torzításának kezelését anélkül, hogy bármilyen változtatást végezne az alkalmazás kódjában.

gyorsítótárral

A teljesítmény javítása érdekében a Spark lehetővé teszi az adatkeretek, adatkészletek és RDD-k gyorsítótárazását a memóriában. Ez lehetővé teszi egy adatkeret többszöri újrafelhasználását az alkalmazásban anélkül, hogy újra kellene számolnia. Alapértelmezés szerint a végrehajtó JVM-jének legfeljebb 50%-át használják az adatkeretek gyorsítótárazására a property spark.memory.storageFraction. Például, ha a spark.executor.memory 30 GB-ra van állítva, majd 15 GB-ot használ a gyorsítótár tárolására, amely immunis a kilakoltatásra.

A gyorsítótár működésének alapértelmezett tárolási szintje a DISK_AND_MEMORY. Ha a gyorsítótárba helyezni kívánt adatkeret mérete nem fér el a végrehajtó memóriájában, a gyorsítótár egy része a lemezre kerül. Ha nincs elég hely a gyorsítótárazott adatok lemezre írásához, a blokkok kiürülnek, és Ön nem élvezheti a gyorsítótárazás előnyeit. A nagyobb dolgozók használata lehetővé teszi több adat gyorsítótárazását a memóriában, ami növeli a munka teljesítményét azáltal, hogy a gyorsítótárazott blokkokat kéri le a memóriából, nem pedig az alapul szolgáló tárolóból.

kísérletek

Például a következőket PySpark munka torzuláshoz vezet, mivel egy végrehajtó az adatok 99.95%-át olyan memóriaigényes aggregátumokkal dolgozza fel, mint pl. collect_list. A feladat egy nagyon nagy adatkeretet is gyorsítótáraz (2.2 TB). Futtassuk ugyanazt a feladatot két iterációban az EMR Serverless rendszeren a következő vCPU- és memóriakonfigurációkkal.

Futtassuk a 3. tesztet a korábban lehetséges legnagyobb dolgozói konfigurációkkal:

  • Az EMR szerver nélküli alkalmazás létrehozásakor beállított végrehajtó mérete = 4 vCPU, 30 GB memória, 200 GB lemez
  • Spark job konfiguráció:
    • spark.executor.cores = 4
    • spark.executor.memory = 27 G

Futtassuk a 4. tesztet az újonnan kiadott nagy munkavégző konfigurációkkal:

  • Az EMR szerver nélküli alkalmazás létrehozásakor beállított végrehajtó mérete = 8 vCPU, 60 GB memória, 200 GB lemez
  • Spark job konfiguráció:
    • spark.executor.cores = 8
    • spark.executor.memory = 54 G

A 3. teszt meghiúsult FetchFailedException, ami azért következett be, mert a végrehajtó memória nem volt elegendő a feladathoz.

A 3. teszt Spark UI-jából azt is láthatjuk, hogy a végrehajtók lefoglalt tárolómemóriáját teljes mértékben kihasználták az adatkeretek gyorsítótárazására.

A gyorsítótárban lévő többi blokk kikerült a lemezre, amint az a végrehajtónál látható stderr rönk:

23/02/06 16:06:58 INFO MemoryStore: Will not store rdd_4_1810
23/02/06 16:06:58 WARN MemoryStore: Not enough space to cache rdd_4_1810 in memory! (computed 134.1 MiB so far)
23/02/06 16:06:58 INFO MemoryStore: Memory use = 14.8 GiB (blocks) + 507.5 MiB (scratch space shared across 4 tasks(s)) = 15.3 GiB. Storage limit = 15.3 GiB.
23/02/06 16:06:58 WARN BlockManager: Persisting block rdd_4_1810 to disk instead.

A megőrzött adatkeret körülbelül 33%-a gyorsítótárban volt a lemezen, amint az a képen látható Tárolás a Spark UI lapján.

A 4. teszt nagyobb végrehajtókkal és vCore-okkal sikeresen lefutott anélkül, hogy memóriával kapcsolatos hibákat dobott volna fel. Ezenkívül az adatkeretnek csak körülbelül 2.2%-a került gyorsítótárba a lemezre. Ezért az adatkeret gyorsítótárazott blokkjait a rendszer a memóriából fogja lekérni, nem pedig a lemezről, ami jobb teljesítményt nyújt.

ajánlások

Annak megállapításához, hogy a nagy munkások hasznosak-e a memóriaigényes Spark-alkalmazások számára, vegye figyelembe a következőket:

  • A Spark felhasználói felület megtekintésével állapítsa meg, hogy a Spark-alkalmazásban vannak-e adatok torzulásai. A következő képernyőképe a Spark UI-ról egy példa-adattorzítási forgatókönyvet mutat be, ahol egy feladat feldolgozza az adatok nagy részét (145.2 GB), a Véletlenszerű olvasás méret. Ha egy vagy több feladat lényegesen több adatot dolgoz fel, mint más feladatok, futtassa újra az alkalmazást nagyobb dolgozókkal 60–120 G memóriával (spark.executor.memory 54 és 109 GB között állítható be, 10%-kal együtt spark.executor.memoryOverhead).

  • Ellenőrizd a Tárolás A Spark History Server lapján megtekintheti a memóriában tárolt adatok és a lemezről tárolt adatok arányát Méret a memóriában és a Méret a lemezen oszlopok. Ha az adatok több mint 10%-a gyorsítótárban van a lemezen, futtassa újra az alkalmazást nagyobb dolgozókkal, hogy növelje a memóriában tárolt adatok mennyiségét.
  • A megfigyelés egy másik módja annak, hogy előre meg lehessen állapítani, hogy a munkának több memóriára van-e szüksége Csúcs JVM memória a Spark UI-n Végrehajtók lapon. Ha a használt JVM-memória csúcsértéke közel van a végrehajtó vagy az illesztőprogram memóriájához, létrehozhat egy alkalmazást egy nagyobb dolgozóval, és magasabb értéket konfigurálhat spark.executor.memory or spark.driver.memory. Például a következő képernyőképen a maximális JVM memóriahasználat 26 GB és spark.executor.memory 27 G-ra van állítva. Ebben az esetben előnyös lehet nagyobb dolgozók használata 60 GB memóriával és spark.executor.memory állítsa 54 G-ra.

Szempontok

Bár a nagy vCPU-k segítenek növelni a keverési blokkok helyét, más tényezők is szerepet játszanak, mint például a lemez átviteli sebessége, a lemez IOPS (bemeneti/kimeneti műveletek másodpercenként) és a hálózati sávszélesség. Egyes esetekben több, több lemezzel rendelkező kisebb dolgozó nagyobb IOPS-t, átviteli sebességet és összességében hálózati sávszélességet kínálhat, mint kevesebb nagy dolgozó. Javasoljuk, hogy vesse össze munkaterhelését a megfelelő vCPU-konfigurációkkal, hogy kiválaszthassa a munkaterheléséhez legjobban illő konfigurációt.

A sok keverést igénylő munkákhoz nagy lemezek használata javasolt. Az alkalmazás létrehozásakor minden egyes dolgozóhoz akár 200 GB-os lemezt is csatolhat. Nagy vCPU-k használata (spark.executor.cores) végrehajtónként növelheti a lemezkihasználást az egyes dolgozókon. Ha az alkalmazás a „Nincs hely az eszközön” beállítással meghiúsul, mert nem fér el a kevert adatok a lemezen, használjon több kisebb dolgozót 200 GB-os lemezzel.

Következtetés

Ebben a bejegyzésben megismerkedhetett a nagy végrehajtók használatának előnyeivel az EMR-kiszolgáló nélküli munkáihoz. A különböző dolgozói konfigurációkkal kapcsolatos további információkért lásd: Munkavállalói konfigurációk. A nagy dolgozói konfigurációk minden olyan régióban elérhetők, ahol az EMR Serverless működik elérhető.


A szerzőről

Veena Vasudevan Senior Partner Solutions Architect és Amazon EMR specialista az AWS-nél, aki a big data-ra és az elemzésekre összpontosít. Segít ügyfeleinek és partnereinek rendkívül optimalizált, méretezhető és biztonságos megoldások kidolgozásában; modernizálják építészetüket; és áttelepítik a nagy adatforgalmat az AWS-re.

Időbélyeg:

Még több AWS Big Data