Ehitage Greteli ja Apache Airflow abil sünteetiline andmekonveier

Allikasõlm: 1068200

Ehitage Greteli ja Apache Airflow abil sünteetiline andmekonveier

Selles ajaveebipostituses koostame ETL-i torujuhtme, mis genereerib sünteetilisi andmeid PostgreSQL-i andmebaasist, kasutades Greteli sünteetiliste andmete API-sid ja Apache Airflow'i.


By Drew Newberry, tarkvarainsener aadressil Gretel.ai

Ehitage Greteli ja Apache Airflow abil sünteetiline andmekonveier

Hei inimesed, minu nimi on Drew ja ma olen siin Gretelis tarkvarainsener. Olen hiljuti mõelnud Greteli API-de integreerimise mustritele olemasolevatesse tööriistadesse, et oleks lihtne luua andmetorusid, kus turvalisus ja klientide privaatsus on esmaklassilised funktsioonid, mitte ainult järelmõte või kast, mida kontrollida.

Üks Greteli inseneride ja klientide seas populaarne andmetöötlustööriist on Apache Airflow. See juhtub ka Greteliga suurepäraselt töötama. Selles ajaveebi postituses näitame teile, kuidas luua Airflow, Greteli ja PostgreSQL-i abil sünteetilist andmetorustikku. Hüppame sisse!

Mis on õhuvool

 
 
Õhuvool on töövoo automatiseerimise tööriist, mida tavaliselt kasutatakse andmekonveierite loomiseks. See võimaldab andmeinseneridel või andmeteadlastel Pythoni ja muude tuttavate konstruktsioonide abil neid torujuhtmeid programmiliselt määratleda ja juurutada. Õhuvoolu tuumaks on DAG-i ehk suunatud atsüklilise graafiku kontseptsioon. Airflow DAG pakub mudelit ja API-de komplekti torujuhtme komponentide, nende sõltuvuste ja täitmisjärjekorra määratlemiseks.

Võite leida Airflow torujuhtmeid, mis kopeerivad andmeid tooteandmebaasist andmelattu. Teised torujuhtmed võivad täita päringuid, mis liidavad normaliseeritud andmed üheks andmestikuks, mis sobib analüüsiks või modelleerimiseks. Veel üks torujuhe võib avaldada igapäevase aruande, mis koondab peamised ärimõõdikud. Nende kasutusjuhtumite ühine teema: andmete liikumise koordineerimine süsteemide vahel. Siin paistab Airflow.

Õhuvoolu ja selle rikkaliku ökosüsteemi võimendamine integratsioon, saavad andmeinsenerid ja teadlased koondada mis tahes arvu erinevaid tööriistu või teenuseid üheks ühtseks torujuhtmeks, mida on lihtne hooldada ja kasutada. Nende integreerimisvõimaluste mõistmisel hakkame nüüd rääkima sellest, kuidas saab Greteli integreerida Airflow torujuhtmesse, et parandada tavalisi andmeoperatsioonide töövooge.

Kuidas Gretel sinna sobib?

 
 
Meie Greteli missiooniks on muuta andmetega töötamine lihtsamaks ja turvalisemaks. Klientidega rääkides on üks valupunkt, millest me sageli kuuleme, aeg ja pingutus, mis on vajalik andmeteadlaste juurdepääsu saamiseks tundlikele andmetele. Kasutades Gretel Sünteetika, saame vähendada tundlike andmetega töötamise riski, luues andmekogumist sünteetilise koopia. Greteli integreerimisel Airflowga on võimalik luua iseteeninduslikke torujuhtmeid, mis hõlbustavad andmeteadlastel vajalike andmete kiiret hankimist, ilma et oleks vaja andmeinseneri iga uue andmepäringu jaoks.

Nende võimaluste demonstreerimiseks loome ETL-i konveieri, mis eraldab andmebaasist kasutajate tegevuste funktsioonid, genereerib andmestiku sünteetilise versiooni ja salvestab andmestiku S3-sse. S3-sse salvestatud sünteetilise andmestikuga saavad andmeteadlased seda seejärel kasutada allavoolu modelleerimiseks või analüüsimiseks, ilma et see kahjustaks klientide privaatsust.

Asjade käivitamiseks vaatame esmalt torujuhtmest linnulennult. Selle diagrammi iga sõlm tähistab torujuhtme etappi või õhuvoo terminites "ülesannet".



Näide Greteli sünteetilisest torujuhtmest Airflow'l.

 

Saame jagada konveieri kolmeks etapiks, sarnaselt sellega, mida võite leida ETL-i torujuhtmest:

  • ekstrakt – Ülesanne extract_features teeb päringu andmebaasist ja teisendab andmed funktsioonide kogumiks, mida andmeteadlased saavad mudelite koostamiseks kasutada.
  • Sünteesige – Gene_synthetic_features kasutab ekstraktitud funktsioone sisendina, treenib sünteetilise mudeli ja loob seejärel Greteli API-de ja pilveteenuste abil sünteetilise funktsioonide komplekti.
  • Koormus – upload_synthetic_features salvestab sünteetilise funktsioonide komplekti S3-sse, kus seda saab sisestada mis tahes allavoolu mudelisse või analüüsi.

Järgmistes osades käsitleme kõiki neid kolme etappi üksikasjalikumalt. Kui soovite iga koodinäidist jälgida, võite minna aadressile gretelai/gretel-airflow-pipelines ja laadige alla kogu selles blogipostituses kasutatud kood. Repo sisaldab ka juhiseid, mida saate järgida Airflow eksemplari käivitamiseks ja torujuhtme otsast lõpuni käitamiseks.

Lisaks võib olla kasulik vaadata Airflow torujuhet tervikuna enne iga komponendi lahkamist, dags/airbnb_user_bookings.py. Järgmistes jaotistes olevad koodilõigud ekstraheeritakse lingitud kasutaja broneerimiskonveierist.

Väljavõtte funktsioonid

 
 
Esimene ülesanne, extract_features vastutab lähteandmebaasist toorandmete eraldamise ja funktsioonide komplektiks muutmise eest. See on tavaline funktsioonitehnoloogia probleem, mida võite leida mis tahes masinõppe- või analüüsikonveierist.

Meie näidiskonveieris loome PostgreSQL-i andmebaasi ja laadime selle koos broneerimisandmetega Airbnb Kaggle võistlus.

See andmestik sisaldab kahte tabelit: Kasutajad ja Seansid. Seansid sisaldab võõrvõtme viidet kasutaja_id. Seda seost kasutades loome funktsioonide komplekti, mis sisaldab erinevaid kasutajate kaupa koondatud broneerimismõõdikuid. Järgmine joonis kujutab funktsioonide koostamiseks kasutatud SQL-päringut.

WITH session_features_by_by_user AS ( SELECT user_id, count(*) AS number_of_actions_cent, count(DISTINCT action_type) AS number_of_unique_actions, round(keskm.(sek_kulunud)) AS avg_session_time_time_seconds, AS min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'broneeringu_taotlus') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS kasutaja_id, u.gender, u.gender .Language, U.SignUp_Method, U.Date_Account_Created, s.number_of_actions_taken, s.number_of_unique_actions, S.AVG_SESSION_TIME_SECONDS, S.MIN_SESSION_TIME_SIME_SECONDS, S.Max_SESSION_SESSION_SECONDS SESSIAALSEURID UNERSEURID UNEKTSIOONID SESSIOONIL SESSIOONI_SEAURID UNAKERS


Seejärel käivitatakse SQL-päring meie Airflow torujuhtmest ja kirjutatakse S3 vahepealsesse asukohta, kasutades järgmist ülesande definitsiooni.

@task() def extract_features(sql_file: str) -> str: kontekst = get_current_context() sql_query = Tee(sql_file).read_text() võti = f"{context['dag_run'].run_id}_booking_features.csv" koos NamedTemporaryFile (mode="r+", suffix=".csv") kui tmp_csv: postgres.copy_expert( f"kopeeri ({sql_query}) stdouti koos csv-päisega", tmp_csv.name ) s3.load_file( failinimi=tmp_csv.name, võti=võti, ) tagastusklahv


Ülesande sisend sql_file määrab, millist päringut andmebaasis käivitada. See päring loetakse ülesandele sisse ja seejärel käivitatakse andmebaasis. Seejärel kirjutatakse päringu tulemused S3-sse ja kaugfaili võti tagastatakse ülesande väljundina.

Allolev ekraanipilt näitab ülalt ekstraheerimispäringu näidistulemust. Järgmises jaotises kirjeldame, kuidas sellest andmekogumist sünteetiline versioon luua.



Päringu tulemuste eelvaade.

Sünteesige funktsioone Greteli API-de abil

 
 
Iga funktsiooni sünteetilise versiooni loomiseks peame esmalt treenima sünteetilise mudeli ja seejärel käivitama mudeli sünteetiliste kirjete genereerimiseks. Gretel on Pythoni SDK-de komplekt, mis hõlbustab Airflow ülesannetesse integreerimist.

Lisaks Pythoni kliendi SDK-dele oleme loonud a Gretel õhuvoolu konks mis haldab Greteli API ühendusi ja saladusi. Pärast Gretel Airflow Connectioni seadistamist on Greteli API-ga ühenduse loomine sama lihtne kui

from hooks.gretel import GretelHook gretel = GretelHook() projekt = gretel.get_project()


Lisateavet Airflow ühenduste konfigureerimise kohta leiate meie Githubi hoidlast README.

Ülaltoodud näite projektimuutujat saab kasutada peamise sisenemispunktina Greteli API abil sünteetiliste mudelite treenimisel ja käitamisel. Lisateabe saamiseks võite tutvuda meiega Pythoni API dokumendid.

Viidates tagasi broneerimiskonveierile, vaatame nüüd üle ülesande generate_synthetic_features. See samm vastutab sünteetilise mudeli treenimise eest, kasutades eelmises ülesandes ekstraheeritud funktsioone.

@task() def gener_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_c poll(modell) return model.get_artifact_link("data_preview")


Vaadates meetodi allkirja, näete, et see võtab tee, data_source. See väärtus osutab eelmises etapis ekstraheeritud S3 funktsioonidele. Hilisemas osas kirjeldame, kuidas kõik need sisendid ja väljundid on omavahel ühendatud.

Mudeli loomisel kasutades project.create_model_obj, esindab parameeter model_config mudeli genereerimiseks kasutatud sünteetilise mudeli konfiguratsiooni. Selles torujuhtmes kasutame oma vaikemudeli konfiguratsioon, aga paljud teised seadistamisvalikud on olemas.

Pärast mudeli konfigureerimist kutsume välja model.submit_cloud(). See esitab mudeli Gretel Cloudi abil koolituseks ja rekordite genereerimiseks. Küsitluse (mudeli) kutsumine blokeerib ülesande seni, kuni mudel on koolituse läbinud.

Nüüd, kui mudel on välja õpetatud, kasutame genereeritud sünteetiliste funktsioonide allalaadimiseks linki tagastamiseks get_artifact_link.



Funktsioonide sünteetilise komplekti andmete eelvaade.

 

Seda artefakti linki kasutatakse viimase upload_synthetic_features sammu sisendina.

Laadige sünteetilisi funktsioone

 
 
Algfunktsioonid on ekstraheeritud ja sünteetiline versioon on loodud. Nüüd on aeg üles laadida sünteetilised funktsioonid, et neile oleks juurdepääs alltarbijatel. Selles näites kasutame andmestiku lõppsihtkohana S3 ämbrit.

@task() def upload_synthetic_features(data_set: str): kontekst = get_current_context() koos open(data_set, "rb") kui synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_v", )


See ülesanne on üsna lihtne. Sisendväärtus data_set sisaldab allkirjastatud HTTP linki sünteetilise andmekogumi allalaadimiseks Greteli API-st. Ülesanne loeb selle faili Airflow töötajasse ja kasutab seejärel juba konfigureeritud S3 konksu sünteetilise funktsioonifaili üleslaadimiseks S3 ämbrisse, kus järgnevad tarbijad või mudelid saavad sellele juurde pääseda.

Torujuhtme orkestreerimine

 
 
Viimase kolme jaotise jooksul oleme läbi käinud kogu koodi, mis on vajalik andmestiku ekstraktimiseks, sünteesimiseks ja laadimiseks. Viimane samm on kõigi nende ülesannete ühendamine üheks Airflow torujuhtmeks.

Kui meenutate selle postituse algust, mainisime lühidalt DAG-i kontseptsiooni. Airflow'i TaskFlow API abil saame need kolm Pythoni meetodit koostada DAG-ks, mis määratleb sisendid, väljundid ja iga sammu käitamise järjekorra.

funktsiooni_tee = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = gener_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Kui järgite nende meetodikutsete teed, saate lõpuks graafiku, mis näeb välja nagu meie algne funktsioonide konveier.



Greteli sünteetilisest materjalist torujuhe õhuvoolul.

 

Kui soovite seda torujuhet käivitada ja seda töös näha, minge aadressile kaasasoleva Githubi hoidla. Sealt leiate juhised Airflow eksemplari käivitamiseks ja torujuhtme otsast lõpuni käivitamiseks.

Asjad kokku panema

 
 
Kui olete nii kaugele jõudnud, olete näinud, kuidas Greteli saab integreerida Airflow-le ehitatud andmekanalisse. Kombineerides Greteli arendajasõbralikke API-sid ning Airflow'i võimsat konksude ja operaatorite süsteemi, on lihtne luua ETL-i torujuhtmeid, mis muudavad andmed kättesaadavamaks ja turvalisemaks kasutamiseks.

Rääkisime ka tavalisest funktsioonide inseneri kasutamise juhtumist, kus tundlikud andmed ei pruugi olla hõlpsasti juurdepääsetavad. Andmestiku sünteetilise versiooni loomisega vähendame tundlike andmete paljastamise ohtu, kuid säilitame siiski andmestiku kasulikkuse, tehes selle kiiresti kättesaadavaks neile, kes seda vajavad.

Mõeldes funktsioonide konveierile abstraktsemalt, on meil nüüd muster, mida saab kasutada mis tahes arvu uute SQL-päringute jaoks. Konveieri uue versiooni juurutamisel ja esialgse SQL-päringu väljavahetamisega saame iga potentsiaalselt tundliku päringu esitada sünteetilise andmestikuga, mis säilitab klientide privaatsuse. Ainus koodirida, mida tuleb muuta, on sql-faili tee. Keerulist andmetöötlust pole vaja.

Täname lugemise eest

 
 
Saatke meile e-kiri aadressil hi@gretel.ai või tule meiega kaasa Lõtv kui teil on küsimusi või kommentaare. Meile meeldiks kuulda, kuidas te Airflow'i kasutate ja kuidas saaksime teie olemasolevate andmekanalitega kõige paremini integreerida.

 
Bio: Drew Newberry on Gretel.ai tarkvarainsener.

Originaal. Loaga uuesti postitatud.

Seotud:

Allikas: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Ajatempel:

Veel alates KDnuggets