Ez a cikk részeként jelent meg Adattudományi Blogaton
Bevezetés
Az Apache Spark egy nagy adatfeldolgozási keretrendszer, amely régóta az egyik legnépszerűbb és leggyakrabban előforduló, a Big Data-hoz kapcsolódó projektekben. Sikeresen ötvözi a munka gyorsaságát és a fejlesztő gondolatait kifejező egyszerűségét.
A fejlesztő kellően magas szinten dolgozik az adatokkal, és úgy tűnik, nincs semmi nehézség abban, hogy például két adathalmazt egyetlen kódsor írásával összekapcsoljon:
ordersDF.join(customersDF, ordersDF ["customer_id"] == ügyfelekDF["id"], "left_outer")
De gondoljunk csak bele: mi történik egy fürtben, ha két olyan adatkészletet kapcsolunk össze, amelyek lehet, hogy teljesen valamelyik fürtcsomóponton vannak, vagy nem? Általában az Apache Spark mindennel gyorsan megbirkózik, de néha, különösen, ha valóban sok az adat, még mindig meg kell értened, mi történik az alábbi szinten, és ezt a tudást az Apache Spark teljes körű működéséhez kell használni.
Ma arról fogunk beszélni, hogyan lehet gyorsan futtatni az alkalmazást, és hogyan használhatja fel az összes szükséges erőforrást. Ez a cikk elsősorban a Spark SQL-modulra összpontosít, amely egy Apache Spark-alkalmazást futtat egy statikus kiépítésű Yarn-fürtön. De az általános elképzelések más kiindulási adatokra is alkalmazhatók. Itt a Spark 2.3 / 2.4 verziót vizsgáljuk, hogy jobban megértsük a Spark 3 újításait.
Adatok és hol élnek
Kezdjük azzal az absztrakcióval, amelyet a Spark biztosít számunkra az adatokkal való munkavégzéshez – ez az RDD ( Rugalmas elosztott adatkészlet ). Jelen cikk szempontjából nem számít, hogy DataFrame-mel vagy DataSettel dolgozunk.
Kép 1
Így a fejlesztők számára egy adathalmaz egyetlen objektumként jelenik meg, és azokat részletekben (blokkokban) külön-külön dolgozzák fel a fürt valamely végrehajtójának valamelyik szálában. A blokk a feldolgozás minimális egysége, a végrehajtó kap egy blokkot és egy utasítást, ami megmondja, mit kell tennie ezzel az adattömbbel.
Hogyan működik egy Apache Spark alkalmazás egy fürtben
Magas szinten minden Spark-alkalmazás működése idején egy illesztőprogramból áll – egy programból, amely végrehajtja a fő () függvényt, valamint a fürt csomópontjain futó végrehajtókat. A végrehajtók univerzális katonák, kapnak egy adattömeget (blokkot) és egy utasítást, végrehajtják azt, és a teljesítést jelentik a sofőrnek, hogy megkapja a következő utasítást. Minden végrehajtón több feldolgozási szál futhat, ebben az esetben mindegyik szál a többitől függetlenül dolgozza fel a saját adatblokkját. Így ha az alkalmazásunk indításakor öt végrehajtót rendeltünk négy maggal (szál) a fürtkezelőtől, akkor minden pillanatban 5 * 4 = 20 szálunk van, és legjobb esetben 20 adatblokkot tudunk egyszerre feldolgozni.
Tehát minden feladatot végre kell hajtani:
-
num_executors – azon különálló JVM-folyamatok száma, amelyekben adatfeldolgozási szálak indulnak el (elhelyezkedhetnek ugyanazon a fürtcsomóponton vagy különböző csomópontokon). A folyamatok az alkalmazás végéig futnak;
-
executor_cores Az egyes végrehajtókban futó párhuzamos szálak száma. Egy szál egyszerre egy adatblokkot dolgoz fel.
Kép 2
Hogyan működik az Apache Spark alkalmazás
A Spark History-ban (a Spark-alkalmazások végrehajtási naplóinak kényelmes formában történő megjelenítésére szolgáló webszerver) így néz ki:
Itt két végrehajtót látunk, amelyek mindegyikének négy feldolgozási szála van.
Shuffle Apache Spark Performance Optimization
Tehát kitaláltuk, hogy N adatblokkunk és P szálunk (dolgozónk) van, amely képes ezeket az adatblokkokat párhuzamosan feldolgozni.
És minden rendben lenne velünk, ha ezek a blokkok az alkalmazás végéig élnének, de szinte minden alkalmazásban lesz olyan feldolgozás, amely a blokkjaink teljes átrendezését vonja maga után. Ez például két tábla kulcsonkénti összekapcsolása (JOIN), kulcs szerinti csoportosítás (GROUP BY). Ebben az esetben a jól ismert MapReduce minta mindenkinél működik., amelyben a kulcsonkénti teljes halmaz adatai új adatblokkokba kerülnek újraosztásra úgy, hogy az azonos kulcsú sorok csak egy blokkban legyenek. Ennek a folyamatnak a neve Shuffle in Spark. Miért írtam nagybetűvel? Mivel ez egy nagyon összetett és költséges folyamat, amely növeli a memóriafelhasználást az előadókon, a lemezmemória fogyasztását a fürt csomópontjainál, valamint a fürt csomópontjai közötti hálózati cserét. Nagyon emlékeztet a hernyó pillangóvá alakítására – minden szétesik, hogy új köntösbe rakják újra, ráadásul energiaigényes.
A feladatok szakaszokra bontása
A Sparkban a blokkok egyik Shuffle-ből a másikba történő feldolgozását szakasznak nevezik. Vegye figyelembe, hogy keverés előtt minden blokk párhuzamosan, keverés után szintén párhuzamosan kerül feldolgozásra, de addig nem indul új szakasz, amíg az előző szakasz végén lévő összes blokk át nem ment ezen a folyamaton. Így a szakaszok közötti határ egy várakozási hely a blokkok párhuzamos feldolgozásakor. Vegye figyelembe azt is, hogy egy szakaszon belül az egy blokkon belüli összes feladat (feladat) egymás után történik egy szálon belül. Vagyis a blokkot nem továbbítják sehova a hálózaton, hanem az összes blokkot párhuzamosan dolgozzák fel. Kiderült, hogy a színpad határain belüli blokkok száma változatlan.
Kép 3
A feladat szakaszokra oszlik
A következő képre jutottunk: minden feladat szakaszokra van felosztva, és az egyes szakaszokon belül a blokkok száma állandó és egyenlő…. És itt kezdődik a móka. Ismerjük a dolgozók számát (P = végrehajtók * magok), de hogy hány blokk lesz az egyes szakaszokban, az alkalmazásunk teljesítményét közvetlenül befolyásoló kérdés. Végül is, ha sok a blokk, és kevés az előadó, akkor minden előadó több blokkot dolgoz fel egymás után, és fordítva: ha kevés a blokk, és több az előadó, akkor néhány előadó tétlen lesz, míg a többi fáradhatatlanul dolgoznak. A legérdekesebb itt az, hogy amikor az alkalmazás lassan fut, megpróbálnak több végrehajtót adni neki, de a teljesítmény ebben az esetben nem növekszik.
Kezdjük azzal, hogy szakaszosan kiszámoljuk a munka mennyiségét. A továbbiakban az egyszerűség kedvéért csak egy adathalmaz blokkjait fogjuk figyelembe venni. Egy adott időpontban az előadók több, egymással nem összefüggő szakaszt is feldolgozhatnak. Például a JOIN előtt a két adatkészlet egymástól függetlenül kerül feldolgozásra, és így felosztja egymás között a végrehajtókat. Ebben az esetben a feldolgozó egységek száma lesz az összegük. De céljaink érdekében meg kell értenünk, mi történik egy adatkészlettel. Az első lépésben minden attól függ, honnan származik az adatkészlet. Például, ha HDFS-ből olvas parketta fájlok könyvtárát, akkor az első lépésben a blokkok száma általában megegyezik (a betöltendő könyvtár összes .parquet fájlját alkotó HDFS blokkok száma). Ez azt jelenti, hogy ebben az esetben minden HDFS blokk egy külön adatblokkot képvisel a feldolgozáshoz. De ne felejtsük el, hogy ez a blokk-elosztás a szakasz végéig megmarad. Íme egy nagyszerű példa.
Egy kis fájlt olvasunk a HDFS-ből 150,000 XNUMX bejegyzéssel. A teljes fájl egy HDFS-blokkba illeszkedik. Így az első szakaszban csak egy adatblokk áll rendelkezésünkre, így csak egy előadó tud vele dolgozni. De a transzformáció logikája szerint minden sor tartalmaz egy mező időtartamát (a megtekintési másodpercek számát), és a kimenetben minden sort annyi sorra kell szoroznunk, ahány másodperc a nézés ebben a sorban.
viDF = spark.read.parquet("/tst/vi/") viDF.createOrReplaceTempView("ViewingInterval") spark.sql("""select t.*, explode(get_list_of_seconds(duatation)) második Számként a ViewingIntervalból""" )
A tesztadatok átalakítása nem működik gyorsan. A Spark történetét tekintve a következőket látjuk:
Az első szakaszban egy adatblokk
A Tasks = 1 azt jelenti, hogy ebben a szakaszban csak egy feladat van, mivel csak egy adatblokk van. A bemeneten 2 MB adatot látunk, a kimeneten pedig már van egy kibővített 1 GB adathalmaz. És mindezt egy szál végzi, a többi tétlen, mivel ebben a szakaszban nincs több feladat. Mit tegyünk, végül is kirobban- egy szűk függőség, és emiatt nem szakítja meg a szakaszt, hanem ugyanabban a szakaszban kerül végrehajtásra, amelyben az adatokat olvassák. A színpad keretein belül, mint már tudjuk, a blokkok száma változatlan. Ebben az esetben könnyen (mivel a bemeneti adatkészlet kicsi, és a keverés gyorsan megy végbe) ezt a szakaszt a repartition(N) függvény segítségével könnyedén ketté tudjuk bontani, ami véletlen sorrendű keveréshez vezet, N adatblokkot hozva létre. a kimeneten körülbelül azonos méretű. És mivel keveredik (Shuffle), ez azt jelenti, hogy új szakasz kezdődik.
viDF = spark.read.parquet("/tst/vi/") viDF.repartition(60).createOrReplaceTempView("ViewingInterval") spark.sql("""select t.*, explode(get_list_of_seconds(duatation)) mint secondNumber from ViewingInterval""")
Nézzük a Spark történetét:
Most párhuzamosan folyik a feldolgozás
A második szakaszban – amit most újraparticionálás után felrobbantam – 60 feladatunk (adatblokkunk) van, és az összes előadó dolgozik és nem tétlen. Az átalakulási idő csaknem felére csökkent. A mi feladatunk, hogy ne legyen leállás, és minden előadó dolgozzon, különben miért veszünk el olyan erőforrásokat a klaszterből, amit később nem használunk fel.
Kitaláltuk az első szakaszt, és még azt is megtanultuk, hogyan bonthatunk ketté bármely szakaszt a repartition(N) segítségével. Foglalkozzunk a belső szakaszokkal, amelyek a két keverés között vannak. Itt mindent a spark spark.sql.shuffle.partitions (alapértelmezett 200) paraméter dönt el. Pontosabban úgy döntöttem, hiszen az AQE bevezetésével a Spark megtanulta ezt a mennyiséget maga szabályozni. Tehát minden belső szakasz adatblokkok spark.sql.shuffle.partícióiból áll. De itt sem minden olyan zökkenőmentes: ha nincs sok adatunk, akkor csökkenteni kell ezt a paramétert, ha pedig sok van, növelni kell. A Spark 2.3 esetében pedig az adatoktól függően valamiféle középutat kell keresni.
Mondok egy példát, amikor kevés adatunk van, és alapértelmezés szerint spark.sql.shuffle.partitions = 200. A Spark History-t nézve azt látjuk, hogy az adatkészletünk mindössze 185 sorból áll, és a keverés során 200 blokkra osztották (de itt nem lesz elég 200 blokkhoz). Figyeljük meg, hogy itt zöldre színezzük az előadó igazán hasznos munkáját. Azaz kiderül, hogy az előadónak egy rekordból egy adatblokk feldolgozására fordított teljes munkaidejéből a hasznos idő <10% volt. A hátralévő idő várakozás és deszerializálás.
Mi történik az utolsó szakaszban? Ez megint attól függ, hogy hova adjuk ki a sajátunkat transzformáció adat. Például mindent egy könyvtárba szeretnénk írni parketta fájlként. Ha ezt a keverés után, anélkül, hogy bármit csinálnánk, 200 fájlt találunk ebben a könyvtárban a programunk végrehajtása után. Miért? Ugyanis a keverés után alapból spark.sql.shuffle.partitions = 200 blokkot kaptunk, és mivel egy blokkot egy szál dolgoz fel, azt egy külön fájlba írja ki.
Jellemzően itt akarják a fejlesztők szabályozni a HDFS-ben lévő fájlok számát, és a DataFrame coalesce(N) fájlba történő mentéskor meghívják a metódust. Ez a metódus egyszerűen beírja a készletünk minden blokkját N egyik új blokkjába. Ez összeolvadt (), a valóságban a repartition()-tól eltérően nem vezet keveredéshez, és ezért nem töri meg a szakaszt, csak úgy teszi, hogy a mi szakaszunkban N adatblokk legyen. De ez oda vezet, hogy ebben a szakaszban csak N előadónak lesz munkája. Mi lenne, ha úgy döntenénk, hogy mindent egy fájlba mentünk – csak egy adatfolyam fog működni. Emlékezzünk vissza az első szakaszra vonatkozó okfejtésre, és ha az utolsó szakasz számítási szempontból elég komoly, akkor közvetlenül a mentés előtt (N) célszerű újrapartíciót (N) végezni, hogy az utolsó szakaszt ketté bontsuk: az utolsó előtti, amely a szálak spark.sql.shuffle.partícióin párhuzamosan hajt végre nehéz számításokat (ha előtte volt például csatlakozás), és az utolsó, amely közvetlenül a szükséges számú fájlra menti el (N ) már erőforrásigényes számítások nélkül. Itt meg kell gondolni, hogy mi lesz gyorsabb – mindent úgy hagyni, ahogy van, vagy az újrapartíció(N) hozzáadásával végrehajtani a keverést, ami szintén nem ingyenes, de lehetséges az összetett számítások párhuzamosítása.
dataDF.repartition(1) .write .format("parquet") .mode("overwrite") .option("tömörítés","snappy") .save("/tst/problem_4/result")
Most, hogy kitaláltuk a kapcsolatot a színpadon lévő blokkok száma és a fellépők száma között, mondok egy kis példát. A beviteli szakaszban 20 adatblokk áll rendelkezésünkre, és csak 10 végrehajtó (5 végrehajtó * 2 mag). Azt látjuk, hogy szinte minden végrehajtó egy blokk feldolgozása után kénytelen újabb blokkot készíteni a feldolgozáshoz, mivel átlagosan egy végrehajtónak két adatblokkja van, amelyeket fel kell dolgozni. De emlékezve arra, hogy egy szakaszban az összes adatblokk párhuzamosan is feldolgozható, 20 végrehajtót kérünk a feladatunkhoz (5 végrehajtó * 4 mag), így minden végrehajtó csak egy blokkot dolgoz fel, és a teljes szakasz ideje ideális esetben felezzük. Pontosan ez a helyzet, amikor az erőforrások növelése működik és növeli a sebességet,
Megnövelt erőforrások – gyorsabban működik
Mellesleg, az előző bekezdésben leírt utolsó szakasz megszakításának módszerének egyik érdekes pontja a karbantartás során:
dataDF.repartition(N).write. …
Ha összehasonlítjuk az utolsó szakasz szünet előtti és utáni mutatókat, akkor minden rendben van: az átalakulási idő többszörösére csökkent (mióta az utolsó számításokat minden előadó párhuzamosan végezte), eltűnt a Shuffle Spill (ekkor az előadó nem rendelkezik elegendő memóriával, és egyfajta cserét intéz a helyivel. Természetesen ebben az esetben az összes adat több nagy blokkban érkezett, és az előadók nehezen emésztették meg azokat).
ÁLLJ MEG! Nézzük meg közelebbről a mentéskor kapott fájlok méretét. 5.9 GB volt, most 10.3 GB, a rekordok száma megegyezik, az adatok összetétele ugyanaz. Miért? Ez egy légy a kenőcsben!
Ügyeljen a kimenet méretére
Csak hozzáadva. A repartition()már rájöttünk, hogy véletlenszerűen osztja el az adatokat. Vagyis az utolsó keverés után kulcsonként részben rendezett adatok helyett (esetünkben a JOIN volt), véletlenszerűen elosztott adatokat kapunk. Emlékezzünk vissza, hogy a parketta egy oszlopos fájltárolási formátum, és a benne lévő adatokat tömörítjük, kihasználva, hogy részben az oszlopba rendelhetők. Kiderült, hogy véletlenszerűséget vittünk be a sorok elosztásába, és így majdnem kétszeresére rontottuk az adatok tömöríthetőségét. Mit tehetsz ellene? A rendelés visszaküldése lehetséges, de minden adatblokkon belül.
dataDF.repartition(20). sortWithinPartitions(asc("id")).write. …
A sortWithinPartitions() függvény mezőnként vagy blokkon belül több mező szerint rendez, azaz nem történik keverés, minden működik egy előadón belül a memóriájában. Miután ezt a függvényt alkalmaztuk a több mező szerinti rendezési átalakításunkban, a kimeneti fájlok teljes mérete még valamivel kisebb lett, mint eredetileg. Most minden gyorsan működik nekünk, a kimeneti fájlok mérete megfelel nekünk. Ráadásul ebben az esetben nagyjából azonos méretű fájlokat rögzítettünk HDFS-ben (ez egy következmény repartition()), ami kényelmes lehet a további feldolgozáshoz.
Optimizer az Apache Spark teljesítményoptimalizálásához
Mivel érintettük a parketta formátumú fájlt, látni fogjuk, hogyan működik a Spark optimalizáló egy olyan optimalizáló szabály példáján, mint a predikátum lenyomása és a vetítési lenyomás.
A vetületes lehúzásnál különösen az oszlopos formájú parketta nyer. Hadd emlékeztessem önöket arra, hogy a lekérdezési fa tényleges végrehajtása csak a művelet végrehajtásának időpontjában kezdődik, vagyis egy olyan művelet, amely adatokat ad ki: átadja a főprogramot (illesztőprogramot) (gyűjti, számolja, ..), tárolja a fájl átvitele egy adatbázisba stb.… Ennek során a Spark lekérdezési fát épít fel és optimalizálja azt. Így a lekérdezés felépítésekor az optimalizáló már tudja, hogy mely mezők szükségesek az eredmény eléréséhez, és csak ezeket a mezőket olvassa ki a fájlból. Mivel az oszlopos fájlformátumban az adatok oszlopok kontextusában tárolódnak, csak ezek a mezők kerülnek kiolvasásra a fájlból.
Tekintsük az optimalizáló predikátum lenyomási szabályát. Ennek az optimalizálásnak az elve meglehetősen egyszerű: sok adatunk van, és nem kell feldolgozni azokat, ha végül nem hasznosak, például a lekérdezésfánk végrehajtásának végén szűrni kell őket. Az optimalizáló megpróbál minden feltételt és szűrőt a lehető legalacsonyabb szintre csökkenteni – közelebb az adatforrásokhoz, ideális esetben a fájl közvetlen beolvasása előtt (vagy például egy RDBMS-hez intézett lekérdezés előtt).
Nézzünk egy példát:
Íme a generált lekérdezés fizikai végrehajtási terve:
Figyeljünk a fájlból történő közvetlen olvasás blokkjára (FileScan parquet) és a PushedFilters blokkra – ezek a feltételek a fájl fizikai olvasása során lesznek érvényben. Látjuk, hogy három feltétel érkezett ide:
-
a ValueDatecondition IsNotNulland LessThanOrEqual-hoz ez utóbbival egyértelmű, ez tükröződik az SQL-ünkben. Honnan jött az IsNotNulltól? Nyilvánvaló, hogy a kérésünkben ValueDate <= állandó feltétel van, és a NULL értékek nem felelnek meg ennek a feltételnek, vagyis logikailag minden helyes. De miért teszi ezt a feltételt külön a parkettareszelő optimalizáló? Erről bővebben a következő bekezdésben;
-
a SubjectID feltételhez: IsNotNull. De nincs ilyen feltételünk a kérésben, és általában nincs feltétel a SubjectID számára. Ezen a mezőn csak egy LEFT JOIN van, ahol az asztalunk a főhöz van kapcsolva. Igen, pontosan: ilyen JOIN esetén az összes olyan sor, ahol a Tárgyazonosító NULL, nem kerül bele a kapott kijelölésbe. Azt látjuk, hogy az optimalizáló ezt figyelembe veszi, és a legelején nem is olvas ki ilyen sorokat a fájlból.
Még mindig találjuk ki, mi olyan érdekes az IsNotNullt feltételben, hogy az optimalizáló külön adja hozzá. Ehhez nézzük meg a parkettareszelő szerkezetét. Te tudod használni ehhez parkettaszerszámok. A helyzet az, hogy a parkettafájl a sémával együtt néhány statisztikát is tárol a mezőkről sorcsoportok összefüggésében.
Belül parketta reszelő
Azt látjuk, hogy minden egész számtípusnál ott van az értékek száma ( Values ), a NULL-ok száma ebben a blokkban ( Null Values ), valamint az oszlop Min és Max értékei ebben a sorcsoportban. Azonnal felidézzük állapotunkat az IsNotNull mezőn. Vagyis ha ebben a csoportban a SubjectID mező értékei = nulla értékek, akkor arra a következtetésre juthatunk, hogy ebben a sorcsoportban minden érték NULL, és egyáltalán nem olvassa ezt a blokkot. Ugyanez vonatkozik a több, kevesebb, egyenlő feltételekre is – itt használhatja az oszlop Min és Max értékeit, és levonhatja a következtetést – hogy egyáltalán el kell-e olvasni ezt a sorcsoportot.
Fontos megérteni, hogy egy feltételt csak akkor lehet alacsonyabb szintre csökkenteni, ha az előre ismert a lekérdezés végrehajtásának megkezdése előtt.
Az Apache Spark Performance Optimization valós példája
A parketta fájlkönyvtár mezőnként particionálva van. A mezőhöz tartozó szűrőértékeket tartalmazó, vesszővel elválasztott karakterláncot átadtuk az átalakításnak. A fejlesztő explode-ot (split(filter)) készített, vagyis ebből a sorból egy kis táblázatot készített értékekkel, és tiszta lelkiismerettel készített egy INNER JOIN-t a szűrendő főtáblával. Az átalakulás lassan működött. Nézzük a lekérdezési tervet:
Az összes partíció a HDFS-ből olvasható
Furcsa, de az első szakaszban a Spark kivonja az összes partíciót ( PartitionCount =121), bár átadunk egy szűrőt, amely csak egy értékből áll. Pontosan ez az a helyzet, amikor egy lekérdezési fa felépítésekor a Spark egyáltalán nem tud a szűrőről, mert az a JOIN mögött van elrejtve.
A szűrőértékeket tartalmazó táblázat létrehozása helyett egyszerűen a szabványos Spark SQL függvényt használjuk find_in_set (). Megkeresi az alkarakterlánc pozícióját a karakterláncban, amely egy vesszővel elválasztott lista.
Vagyis a szűrő most egy egyszerű kifejezést képvisel: ahol find_in_set(surveyprogectid, )
És ha megnézzük a lekérdezés végrehajtási tervet, mivel annak felépítésekor az optimalizáló már ismeri a szűrő karakterláncot és a feltételt, ezt a feltételt leengedi a fájlból való kiolvasás szintjére. Ráadásul, tudva, hogy ez egy particionálási mező, alkalmazza a partíció metszés szabályát, azaz kidobja a szűrőnek nem megfelelő partíciókat a mérlegelésből.
Csak egy partíció kerül beolvasásra a HDFS-ből
Kérjük, vegye figyelembe, hogy a feltételünk most a PartitionFilters blokkban van, mivel a mező particionál, ezért csak a szükséges partíció kerül kivonásra a HDFS-ből ( PartitionCount = 1).
Ezért, ha van egy nagy táblája particionálással, és néhány partíciót a JOIN-en keresztül választ ki, akkor jobb lehet egy külön műveletet létrehozni, hogy ebből a szűrt táblából karakterláncként értéklistát hozzon létre, és konstansként adja át a fő lekérdezés feltétele.
Pain Point of Apache Spark Optimizer
Az optimalizáló nagyszerű munkája… De néha káros lehet az a tendencia, hogy a lehető legalacsonyabbra hozza a feltételt a forrásig. UDF (felhasználó által meghatározott funkció ) lép a színre. A felhasználó által definiált funkció a Spark optimalizáló fekete doboza.
Tekintsük a következő példát:
Van egy nagy fájlunk, több milliárd sorral. Csak egyedi azonosítókat akarunk kiválasztani, és az UDF-ünket alkalmazni akarjuk rájuk, majd csak azokat a találatokat szeretnénk kiválasztani, amelyek nullák lesznek. A kérések sorrendje:
T1=> válasszuk külön azonosító ból ből T T2=> válasszuk UDF(ID) as newID ból ből T1 T3=> válasszuk * ból ből T2 ahol newID is null
Csak néhány ezer egyedi azonosítóérték található a táblázatban, és az UDF-ünk nem működik gyorsan - a HBase-hez megy. Vagyis mi, miután felépítettünk egy ilyen lekérdezési fát, arra számítunk, hogy az UDF-ünket több ezerszer hívják meg. Elindítjuk a kérést, és sokáig várunk.
Nézzük a lekérdezés végrehajtási tervet:
Az UDF állapota majdnem a legalacsonyabb szintre esett
… Ó! Az optimalizáló minden tőle telhetőt megtett: őszintén a fájl közvetlen beolvasása után azonnal a Null(UDF(id)) feltételt a szintre csökkentette, egészen addig a pillanatig, amikor csak egyedi azonosítót választunk. Ez azt jelenti, hogy nehéz UDF-ünk több milliárdszor próbálkozott több ezer helyett.
Mit gondolhat itt? Például végezze el a gyorsítótárat (persist) az egyedi azonosító (T1) kiszámítása után. Vagy használjon oldalnézetet, amelyen az optimalizáló nem adja tovább a feltételt.
válasszuk udf_res as newID ból ből T1 oldalsó Kilátás felrobban (tömb(UDF(ID))) as udf_res
Már az elején megkaptuk, amit akartunk – az UDF-et csak egyedi azonosítókra számítják ki:
Következtetés
A cikk hatókörén kívül vannak kérdések a JOIN optimalizálásával kapcsolatban: sugárzás, adatferdítés, az összevonás és az újrapartíció előnyei és hátrányai. Egyes pontokat kellő részletességgel ismertetünk itt, néhányat pedig nem. Szóval, látogassa meg AnalyticsVidhya az alapokhoz, és ennek használatával a teljesítmény rendkívül optimalizált. Nézze meg a mellékelt képernyőképeket is!
Hivatkozás:
1. kép: https://mallikarjuna_g.gitbooks.io/spark/content/diagrams/spark-rdds.png
Image 2: https://media.springernature.com/lw685/springer-static/image/art%3A10.1007%2Fs11227-019-03093-0/MediaObjects/11227_2019_3093_Fig1_HTML.png
3. kép: https://platoaistream.net/wp-content/uploads/2021/09/apache-spark-performance-optimization-for-data-engineers-3.png
- "
- 000
- 9
- Fiók
- Akció
- Előny
- Minden termék
- között
- analitika
- Apache
- Apache Spark
- app
- Alkalmazás
- alkalmazások
- cikkben
- Alapjai
- BEST
- Big adatok
- Billió
- Fekete
- Doboz
- Épület
- hívás
- közelebb
- kód
- Oszlop
- fogyasztás
- dátum
- adatfeldolgozás
- adatkészlet
- adatbázis
- üzlet
- részlet
- Fejlesztő
- fejlesztők
- DID
- állásidő
- gépkocsivezető
- Mérnökök
- belép
- stb.
- csere
- végrehajtás
- GYORS
- Fields
- Ábra
- Szűrők
- végén
- vezetéknév
- Összpontosít
- forma
- formátum
- Keretrendszer
- Ingyenes
- móka
- funkció
- általános
- nagy
- Zöld
- Csoport
- itt
- Magas
- történelem
- Hogyan
- How To
- HTTPS
- Növelje
- IT
- Munka
- csatlakozik
- Kulcs
- tudás
- nagy
- indít
- vezet
- tanult
- szint
- vonal
- Lista
- helyi
- Hosszú
- Mérkőzés
- Média
- közepes
- Legnepszerubb
- hálózat
- csomópontok
- érdekében
- Más
- Egyéb
- Mintás
- Fizet
- teljesítmény
- fizikai
- kép
- Népszerű
- Program
- projektek
- Olvasás
- Valóság
- nyilvántartások
- jelentést
- Tudástár
- REST
- Eredmények
- futás
- futás
- megtakarítás
- Tudomány
- értelemben
- készlet
- Egyszerű
- Méret
- kicsi
- So
- sebesség
- osztott
- SQL
- Színpad
- kezdet
- statisztika
- tárolás
- árnyékolók
- megmondja
- teszt
- Az alapok
- A háztömb
- The Source
- idő
- Átalakítás
- Egyetemes
- us
- érték
- Megnézem
- várjon
- háló
- webszerver
- Mi
- belül
- Munka
- dolgozók
- művek
- írás