Zgradite sintetični podatkovni cevovod z uporabo Gretel in Apache Airflow

Izvorno vozlišče: 1068200

Zgradite sintetični podatkovni cevovod z uporabo Gretel in Apache Airflow

V tej objavi v spletnem dnevniku gradimo cevovod ETL, ki generira sintetične podatke iz baze podatkov PostgreSQL z uporabo Gretelovih API-jev za sintetične podatke in Apache Airflow.


By Drew Newberry, programski inženir pri Gretel.ai

Zgradite sintetični podatkovni cevovod z uporabo Gretel in Apache Airflow

Hey folks, my name is Drew, and I’m a software engineer here at Gretel. I’ve recently been thinking about patterns for integrating Gretel APIs into existing tools so that it’s easy to build data pipelines where security and customer privacy are first-class features, not just an afterthought or box to check.

One data engineering tool that is popular amongst Gretel engineers and customers is Apache Airflow. It also happens to work great with Gretel. In this blog post, we’ll show you how to build a synthetic data pipeline using Airflow, Gretel and PostgreSQL. Let’s jump in!

Kaj je pretok zraka

 
 
Pretok zraka je orodje za avtomatizacijo poteka dela, ki se običajno uporablja za gradnjo podatkovnih cevovodov. Podatkovnim inženirjem ali podatkovnim znanstvenikom omogoča, da programsko definirajo in uvedejo te cevovode z uporabo Pythona in drugih znanih konstrukcij. V središču Airflow je koncept DAG ali usmerjenega acikličnega grafa. DAG Airflow zagotavlja model in niz API-jev za definiranje komponent cevovoda, njihovih odvisnosti in vrstnega reda izvajanja.

Morda boste našli cevovode Airflow, ki posnemajo podatke iz podatkovne baze izdelkov v podatkovno skladišče. Drugi cevovodi lahko izvajajo poizvedbe, ki združujejo normalizirane podatke v en nabor podatkov, primeren za analitiko ali modeliranje. Še en cevovod lahko objavi dnevno poročilo, ki združuje ključne poslovne meritve. Skupna tema teh primerov uporabe: usklajevanje pretoka podatkov med sistemi. Tukaj Airflow blesti.

Izkoriščanje Airflowa in njegovega bogatega ekosistema integracije, lahko podatkovni inženirji in znanstveniki orkestrirajo poljubno število različnih orodij ali storitev v enoten poenoten cevovod, ki ga je enostavno vzdrževati in upravljati. Z razumevanjem teh integracijskih zmožnosti se bomo zdaj začeli pogovarjati o tem, kako bi lahko Gretel integrirali v cevovod Airflow za izboljšanje običajnih delovnih tokov podatkovnih operacij.

Kako se Gretel prilega?

 
 
Pri Gretel je naše poslanstvo narediti podatke lažje in varnejše za delo. Ko se pogovarjamo s strankami, je ena bolečina, o kateri pogosto slišimo, čas in trud, ki sta potrebna za pridobitev dostopa podatkovnih znanstvenikov do občutljivih podatkov. Uporaba Sintetika Gretel, lahko zmanjšamo tveganje dela z občutljivimi podatki z ustvarjanjem sintetične kopije nabora podatkov. Z integracijo Gretel z Airflow je mogoče ustvariti samopostrežne cevovode, ki podatkovnim znanstvenikom olajšajo hitro pridobivanje podatkov, ki jih potrebujejo, ne da bi za vsako novo zahtevo po podatkih potrebovali podatkovnega inženirja.

Za predstavitev teh zmožnosti bomo zgradili cevovod ETL, ki ekstrahira funkcije uporabniške dejavnosti iz baze podatkov, generira sintetično različico nabora podatkov in shrani nabor podatkov v S3. S sintetičnim naborom podatkov, shranjenim v S3, ga lahko podatkovni znanstveniki nato uporabijo za nadaljnje modeliranje ali analizo brez ogrožanja zasebnosti strank.

Za začetek si najprej oglejmo cevovod iz ptičje perspektive. Vsako vozlišče v tem diagramu predstavlja korak cevovoda ali »nalogo« v smislu Airflow.



Primer sintetičnega cevovoda Gretel na Airflow.

 

Cevovod lahko razdelimo na 3 stopnje, podobno kot v cevovodu ETL:

  • Izvleček – The extract_features task will query a database, and transform the data into a set of features that can be used by data scientists for building models.
  • Sintetizirajte – generate_synthetic_features will take the extracted features as input, train a synthetic model, and then generate a synthetic set of features using Gretel APIs and cloud services.
  • Obremenitev – upload_synthetic_features saves the synthetic set of features to S3 where it can be ingested into any downstream model or analysis.

V naslednjih nekaj razdelkih se bomo podrobneje poglobili v vsakega od teh treh korakov. Če želite spremljati vsak vzorec kode, se lahko odpravite na gretelai/gretel-zračni-cevovodi in prenesite vso kodo, uporabljeno v tej objavi v spletnem dnevniku. Repo vsebuje tudi navodila, ki jim lahko sledite, da zaženete primerek Airflow in zaženete cevovod od konca do konca.

Poleg tega je morda koristno, če si ogledate cevovod Airflow v celoti, preden razčlenimo vsako komponento, dags/airbnb_user_bookings.py. Delčki kode v naslednjih razdelkih so izvlečeni iz povezanega uporabniškega cevovoda za rezervacije.

Funkcije izvlečka

 
 
Prva naloga, extract_features je odgovorna za pridobivanje neobdelanih podatkov iz izvorne baze podatkov in njihovo pretvorbo v nabor funkcij. To je običajno funkcijski inženiring težavo, ki jo lahko najdete v katerem koli cevovodu strojnega učenja ali analitike.

V našem primeru cevovoda bomo zagotovili bazo podatkov PostgreSQL in jo naložili s podatki o rezervaciji iz Tekmovanje Airbnb Kaggle.

Ta nabor podatkov vsebuje dve tabeli, Uporabniki in Seje. Seje vsebujejo sklic na tuji ključ, user_id. Z uporabo tega odnosa bomo ustvarili niz funkcij, ki bodo vsebovale različne meritve rezervacij, združene po uporabniku. Naslednja slika predstavlja poizvedbo SQL, uporabljeno za izdelavo funkcij.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)) AS max_session_time_seconds, round(min(sekunde) _preteklo)) AS min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id
)
IZBERITE u.id AS user_id, u.gender, u.age, u.language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds
FROM session_features_by_user s LEFT JOIN uporabniki u ON u.id = s.user_id
OMEJITEV 5000


Poizvedba SQL se nato izvede iz našega cevovoda Airflow in zapiše na vmesno lokacijo S3 z uporabo naslednje definicije naloge.

@task()
def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" with NamedTemporaryFile(mode=" r+", suffix=".csv") kot tmp_csv: postgres.copy_expert( f"kopiraj ({sql_query}) v stdout z glavo csv", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, key=key, ) povratni ključ


Vnos v nalogo, sql_file, določa, katero poizvedbo je treba izvesti v bazi podatkov. Ta poizvedba bo prebrana v nalogo in nato izvedena v bazi podatkov. Rezultati poizvedbe bodo nato zapisani v S3 in ključ oddaljene datoteke bo vrnjen kot rezultat naloge.

Spodnji posnetek zaslona prikazuje vzorčni niz rezultatov poizvedbe za ekstrakcijo od zgoraj. V naslednjem razdelku bomo opisali, kako ustvariti sintetično različico tega niza podatkov.



Predogled rezultatov poizvedbe.

Sintetizirajte funkcije z API-ji Gretel

 
 
Za generiranje sintetične različice vsake funkcije moramo najprej usposobiti sintetični model in nato zagnati model za generiranje sintetičnih zapisov. Gretel ima nabor Python SDK-jev, ki olajšajo integracijo v naloge Airflow.

Poleg odjemalskih SDK-jev Python smo ustvarili Kavelj Gretel Airflow ki upravlja povezave in skrivnosti Gretel API. Po nastavitvi povezave Gretel Airflow je povezava z vmesnikom Gretel API tako enostavna kot

iz hooks.gretel import GretelHook gretel = GretelHook()
projekt = gretel.get_project()


Za več informacij o tem, kako konfigurirati povezave Airflow, si oglejte naš repozitorij Github README.

Spremenljivko projekta v zgornjem primeru je mogoče uporabiti kot glavno vstopno točko za usposabljanje in izvajanje sintetičnih modelov z uporabo Gretelinega API-ja. Za več podrobnosti si lahko ogledate naše Dokumenti Python API.

Če se vrnemo nazaj na cevovod rezervacij, bomo zdaj pregledali nalogo generate_synthetic_features. Ta korak je odgovoren za usposabljanje sintetičnega modela z uporabo funkcij, pridobljenih v prejšnji nalogi.

@task()
def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) vrni model.get_artifact_link("data_preview")


Če pogledate podpis metode, boste videli, da ima pot, data_source. Ta vrednost kaže na funkcije S3, pridobljene v prejšnjem koraku. V kasnejšem razdelku se bomo sprehodili skozi to, kako so vsi ti vhodi in izhodi povezani skupaj.

Pri ustvarjanju modela s project.create_model_obj parameter model_config predstavlja konfiguracijo sintetičnega modela, uporabljenega za generiranje modela. V tem procesu uporabljamo naše privzeta konfiguracija modela, ampak mnogi drugi konfiguracijske možnosti so na voljo.

Ko je model konfiguriran, pokličemo model.submit_cloud(). To bo predložilo model za usposabljanje in ustvarjanje zapisov z uporabo Gretel Cloud. Klicanje poll(model) bo blokiralo opravilo, dokler model ne zaključi usposabljanja.

Zdaj, ko je bil model usposobljen, bomo uporabili get_artifact_link za vrnitev povezave za prenos ustvarjenih sintetičnih funkcij.



Predogled podatkov sintetičnega nabora funkcij.

 

Ta povezava artefakta bo uporabljena kot vhod v končni korak upload_synthetic_features.

Naloži sintetične lastnosti

 
 
Izvirne lastnosti so bile ekstrahirane in ustvarjena je bila sintetična različica. Zdaj je čas, da naložite sintetične funkcije, da bodo do njih lahko dostopali nadaljnji potrošniki. V tem primeru bomo kot končni cilj za nabor podatkov uporabili vedro S3.

@task()
def upload_synthetic_features(data_set: str): context = get_current_context() with open(data_set, "rb") kot synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


Ta naloga je precej enostavna. Vhodna vrednost data_set vsebuje podpisano povezavo HTTP za prenos sintetičnega nabora podatkov iz Gretelinega API-ja. Naloga bo to datoteko prebrala v delavca Airflow in nato uporabila že konfiguriran kavelj S3 za nalaganje sintetične datoteke s funkcijami v vedro S3, kjer lahko do nje dostopajo uporabniki ali modeli na nižji stopnji.

Orkestriranje cevovoda

 
 
V zadnjih treh razdelkih smo se sprehodili skozi vso kodo, ki je potrebna za ekstrahiranje, sintetiziranje in nalaganje nabora podatkov. Zadnji korak je povezati vsako od teh nalog v en sam cevovod Airflow.

Če se spomnite začetka te objave, smo na kratko omenili koncept DAG. Z uporabo TaskFlow API-ja Airflow lahko te tri metode Python sestavimo v DAG, ki definira vhode, izhode in vrstni red izvajanja vsakega koraka.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql")
sintetični_podatki = generiraj_sintetične_features(feature_path)
upload_synthetic_features(sintetični_podatki)


Če sledite poti teh klicev metod, boste sčasoma dobili graf, ki je videti kot naš prvotni cevovod funkcij.



Sintetični cevovod Gretel na Airflow.

 

Če želite zagnati ta cevovod in ga videti v akciji, pojdite na spremlja Github repozitorij. Tam boste našli navodila, kako zagnati primerek Airflow in zagnati cevovod od konca do konca.

Zavijanje stvari

 
 
Če ste prišli tako daleč, ste videli, kako je mogoče Gretel vključiti v podatkovni cevovod, zgrajen na Airflowu. Z združevanjem Gretelovih razvijalcem prijaznih API-jev in zmogljivega sistema kavljev in operaterjev Airflow je enostavno zgraditi cevovode ETL, ki naredijo podatke bolj dostopne in varnejše za uporabo.

Govorili smo tudi o običajnem primeru uporabe inženiringa funkcij, kjer občutljivi podatki morda niso lahko dostopni. Z generiranjem sintetične različice nabora podatkov zmanjšamo tveganje za razkritje občutljivih podatkov, vendar še vedno ohranimo uporabnost nabora podatkov, hkrati pa ga damo hitro na voljo tistim, ki ga potrebujejo.

Če razmišljamo o cevovodu funkcij v bolj abstraktnem smislu, imamo zdaj vzorec, ki ga je mogoče spremeniti za poljubno število novih poizvedb SQL. Z uvedbo nove različice cevovoda in zamenjavo začetne poizvedbe SQL lahko katero koli potencialno občutljivo poizvedbo postavimo na sintetični nabor podatkov, ki ohranja zasebnost strank. Edina vrstica kode, ki jo je treba spremeniti, je pot do datoteke sql. Zapleten podatkovni inženiring ni potreben.

Hvala za branje

 
 
Pošljite nam e-pošto na hi@gretel.ai ali pa se nam pridružite Slack če imate kakršna koli vprašanja ali komentarje. Radi bi slišali, kako uporabljate Airflow in kako se lahko najbolje integriramo z vašimi obstoječimi podatkovnimi cevovodi.

 
Bio: Drew Newberry je programski inženir pri Gretel.ai.

prvotni. Poročeno z dovoljenjem.

Povezano:

Vir: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Časovni žig:

Več od KDnuggets