Bygg en syntetisk datarørledning ved hjelp av Gretel og Apache Airflow

Kilde node: 1068200

Bygg en syntetisk datarørledning ved hjelp av Gretel og Apache Airflow

I dette blogginnlegget bygger vi en ETL-pipeline som genererer syntetiske data fra en PostgreSQL-database ved hjelp av Gretels Synthetic Data APIs og Apache Airflow.


By Drew Newberry, programvareingeniør hos Gretel.ai

Bygg en syntetisk datarørledning ved hjelp av Gretel og Apache Airflow

Hei folkens, jeg heter Drew, og jeg er programvareingeniør her hos Gretel. Jeg har nylig tenkt på mønstre for å integrere Gretel APIer i eksisterende verktøy, slik at det er enkelt å bygge datapipelines der sikkerhet og kundepersonvern er førsteklasses funksjoner, ikke bare en ettertanke eller boks å sjekke.

Et datateknikkverktøy som er populært blant Gretel-ingeniører og kunder er Apache Airflow. Det tilfeldigvis fungerer også bra med Gretel. I dette blogginnlegget viser vi deg hvordan du bygger en syntetisk datapipeline ved hjelp av Airflow, Gretel og PostgreSQL. La oss hoppe inn!

Hva er luftstrøm

 
 
Luftstrøm er et automatiseringsverktøy for arbeidsflyt som vanligvis brukes til å bygge datapipelines. Det gjør det mulig for dataingeniører eller dataforskere å programmere definere og distribuere disse rørledningene ved hjelp av Python og andre kjente konstruksjoner. I kjernen av Airflow er konseptet med en DAG, eller rettet asyklisk graf. En Airflow DAG gir en modell og et sett med APIer for å definere rørledningskomponenter, deres avhengigheter og utførelsesrekkefølge.

Du kan finne Airflow-rørledninger som replikerer data fra en produktdatabase til et datavarehus. Andre rørledninger kan utføre spørringer som slår sammen normaliserte data til et enkelt datasett som er egnet for analyser eller modellering. Enda en pipeline kan publisere en daglig rapport som samler viktige forretningsberegninger. Et felles tema som deles blant disse brukssakene: koordinering av bevegelse av data på tvers av systemer. Det er her Airflow skinner.

Utnytte Airflow og dets rike økosystem av integrasjoner, kan dataingeniører og forskere orkestrere et hvilket som helst antall forskjellige verktøy eller tjenester i en enkelt enhetlig pipeline som er enkel å vedlikeholde og betjene. Med en forståelse av disse integreringsmulighetene, vil vi nå begynne å snakke om hvordan Gretel kan integreres i en Airflow-pipeline for å forbedre vanlige dataoperasjonsarbeidsflyter.

Hvordan passer Gretel inn?

 
 
Hos Gretel er vårt oppdrag å gjøre data enklere og tryggere å jobbe med. Når vi snakker med kunder, er et smertepunkt vi ofte hører om tiden og innsatsen som kreves for å få dataforskere tilgang til sensitive data. Ved hjelp av Gretel Syntetikk, kan vi redusere risikoen for å arbeide med sensitive data ved å generere en syntetisk kopi av datasettet. Ved å integrere Gretel med Airflow, er det mulig å lage selvbetjente rørledninger som gjør det enkelt for dataforskere å raskt få dataene de trenger uten å kreve en dataingeniør for hver ny dataforespørsel.

For å demonstrere disse egenskapene bygger vi en ETL-pipeline som trekker ut brukeraktivitetsfunksjoner fra en database, genererer en syntetisk versjon av datasettet og lagrer datasettet til S3. Med det syntetiske datasettet lagret i S3, kan det deretter brukes av dataforskere til nedstrøms modellering eller analyse uten at det går på bekostning av kundenes personvern.

For å sette i gang, la oss først ta et fugleperspektiv av rørledningen. Hver node i dette diagrammet representerer et rørledningstrinn, eller "oppgave" i luftstrømtermer.



Eksempel på Gretel syntetisk rørledning på Airflow.

 

Vi kan dele rørledningen opp i 3 trinn, lik det du kan finne i en ETL-rørledning:

  • Pakk – Extract_features-oppgaven vil spørre etter en database og transformere dataene til et sett med funksjoner som kan brukes av dataforskere til å bygge modeller.
  • syntetisere – generate_synthetic_features tar de utpakkede funksjonene som input, trener opp en syntetisk modell og genererer deretter et syntetisk sett med funksjoner ved hjelp av Gretel APIer og skytjenester.
  • Laste – upload_synthetic_features lagrer det syntetiske settet med funksjoner til S3 der det kan integreres i en hvilken som helst nedstrømsmodell eller analyse.

I de neste avsnittene vil vi dykke inn i hvert av disse tre trinnene mer detaljert. Hvis du ønsker å følge med på hvert kodeeksempel, kan du gå over til gretelai/gretel-luftstrøm-rørledninger og last ned all koden som brukes i dette blogginnlegget. Repoen inneholder også instruksjoner du kan følge for å starte en Airflow-forekomst og kjøre rørledningen ende til ende.

I tillegg kan det være nyttig å se Airflow-rørledningen i sin helhet før vi dissekerer hver komponent, dags/airbnb_user_bookings.py. Kodebitene i de følgende delene er hentet fra den koblede brukerbestillingspipelinen.

Trekk ut funksjoner

 
 
Den første oppgaven, extract_features, er ansvarlig for å trekke ut rådata fra kildedatabasen og transformere den til et sett med funksjoner. Dette er en vanlig funksjonsteknikk problem du kan finne i enhver maskinlærings- eller analysepipeline.

I vår eksempelpipeline vil vi klargjøre en PostgreSQL-database og laste den med bestillingsdata fra en Airbnb Kaggle-konkurranse.

Dette datasettet inneholder to tabeller, brukere og økter. Sessions inneholder en fremmednøkkelreferanse, user_id. Ved å bruke dette forholdet vil vi lage et sett med funksjoner som inneholder ulike bestillingsverdier samlet etter bruker. Følgende figur representerer SQL-spørringen som brukes til å bygge funksjonene.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max_secsdssds)min (sekunder_forløpt)) AS min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds ON . id = s.user_id LIMIT 5000


SQL-spørringen blir deretter utført fra vår Airflow-pipeline og skrevet til et mellomliggende S3-sted ved å bruke følgende oppgavedefinisjon.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() nøkkel = f"{context['dag_run'].run_id}_booking_features.csv" med NamedTemporaryFi (mode="r+", suffiks=".csv") som tmp_csv: postgres.copy_expert( f"copy ({sql_query}) til stdout med csv header", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, nøkkel=nøkkel, ) returnøkkel


Inndataene til oppgaven, sql_file, bestemmer hvilken spørring som skal kjøres på databasen. Denne spørringen vil bli lest inn til oppgaven og deretter utført mot databasen. Resultatene av spørringen vil da bli skrevet til S3 og den eksterne filnøkkelen vil bli returnert som en utdata for oppgaven.

Skjermbildet nedenfor viser et eksempelresultatsett av utvinningsspørringen ovenfra. Vi vil beskrive hvordan du lager en syntetisk versjon av dette datasettet i neste avsnitt.



Forhåndsvisning av søkeresultat.

Syntetiser funksjoner ved hjelp av Gretel APIer

 
 
For å generere en syntetisk versjon av hver funksjon, må vi først trene en syntetisk modell, og deretter kjøre modellen for å generere syntetiske poster. Gretel har et sett med Python SDK-er som gjør det enkelt å integrere i Airflow-oppgaver.

I tillegg til Python Client SDK-ene har vi laget en Gretel Airflow Hook som administrerer Gretel API-tilkoblinger og hemmeligheter. Etter å ha satt opp en Gretel Airflow Connection, er det like enkelt å koble til Gretel API

fra hooks.gretel import GretelHook gretel = GretelHook() project = gretel.get_project()


For mer informasjon om hvordan du konfigurerer Airflow-tilkoblinger, se vårt Github-lager README.

Prosjektvariabelen i eksemplet ovenfor kan brukes som hovedinngangspunkt for trening og kjøring av syntetiske modeller ved bruk av Gretels API. For mer informasjon, kan du sjekke ut vår Python API-dokumenter.

Med henvisning tilbake til bestillingspipelinen vil vi nå gjennomgå oppgaven gener_synthetic_features. Dette trinnet er ansvarlig for å trene den syntetiske modellen ved å bruke funksjonene hentet ut i forrige oppgave.

@task() def gener_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(modell) returner model.get_artifact_link("data_preview")


Når du ser på metodesignaturen, vil du se at den tar en bane, data_source. Denne verdien peker på S3-funksjonene som ble hentet ut i forrige trinn. I et senere avsnitt skal vi gå gjennom hvordan alle disse inngangene og utgangene er koblet sammen.

Når du oppretter modellen ved hjelp av project.create_model_obj, representerer model_config-parameteren den syntetiske modellkonfigurasjonen som brukes til å generere modellen. I denne rørledningen bruker vi vår standard modellkonfig, men mange andre konfigurasjonsalternativer er tilgjengelige.

Etter at modellen er konfigurert kaller vi model.submit_cloud(). Dette vil sende inn modellen for trening og rekordgenerering ved bruk av Gretel Cloud. Å ringe poll(modell) vil blokkere oppgaven til modellen har fullført opplæring.

Nå som modellen er opplært, bruker vi get_artifact_link for å returnere en lenke for å laste ned de genererte syntetiske funksjonene.



Dataforhåndsvisning av det syntetiske settet med funksjoner.

 

Denne artefaktlenken vil bli brukt som input til det siste upload_synthetic_features-trinnet.

Last inn syntetiske funksjoner

 
 
De originale funksjonene er trukket ut, og en syntetisk versjon er laget. Nå er det på tide å laste opp de syntetiske funksjonene slik at de kan nås av nedstrømsforbrukere. I dette eksemplet skal vi bruke en S3-bøtte som den endelige destinasjonen for datasettet.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() med open(data_set, "rb") som synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


Denne oppgaven er ganske grei. Data_set-inndataverdien inneholder en signert HTTP-kobling for å laste ned det syntetiske datasettet fra Gretels API. Oppgaven vil lese den filen inn i Airflow-arbeideren, og deretter bruke den allerede konfigurerte S3-kroken til å laste opp den syntetiske funksjonsfilen til en S3-bøtte der nedstrømsforbrukere eller modeller kan få tilgang til den.

Orkestrere rørledningen

 
 
I løpet av de tre siste delene har vi gått gjennom all koden som kreves for å trekke ut, syntetisere og laste et datasett. Det siste trinnet er å knytte hver av disse oppgavene sammen til en enkelt Airflow-rørledning.

Hvis du husker tilbake til begynnelsen av dette innlegget, nevnte vi kort konseptet med en DAG. Ved å bruke Airflows TaskFlow API kan vi komponere disse tre Python-metodene til en DAG som definerer inngangene, utgangene og rekkefølgen for hvert trinn som skal kjøres.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = gener_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Hvis du følger banen til disse metodekallene, vil du til slutt få en graf som ser ut som vår originale funksjonspipeline.



Gretel syntetisk rørledning på Airflow.

 

Hvis du vil kjøre denne rørledningen og se den i aksjon, gå over til medfølgende Github-depot. Der finner du instruksjoner om hvordan du starter en Airflow-forekomst og kjører rørledningen ende til ende.

Pakke opp ting

 
 
Hvis du har kommet så langt, har du sett hvordan Gretel kan integreres i en datapipeline bygget på Airflow. Ved å kombinere Gretels utviklervennlige APIer og Airflows kraftige system med kroker og operatører er det enkelt å bygge ETL-rørledninger som gjør data mer tilgjengelig og tryggere å bruke.

Vi snakket også om en vanlig funksjonsteknikk der sensitive data kanskje ikke er lett tilgjengelige. Ved å generere en syntetisk versjon av datasettet reduserer vi risikoen for å avsløre eventuelle sensitive data, men beholder likevel nytten av datasettet samtidig som vi gjør det raskt tilgjengelig for de som trenger det.

Når vi tenker på funksjonspipelinen i mer abstrakte termer, har vi nå et mønster som kan brukes på nytt for et hvilket som helst antall nye SQL-spørringer. Ved å distribuere en ny versjon av pipelinen og bytte ut den første SQL-spørringen, kan vi fronte enhver potensielt sensitiv spørring med et syntetisk datasett som bevarer kundenes personvern. Den eneste kodelinjen som må endres er banen til sql-filen. Ingen kompleks datateknikk kreves.

Takk for at du leste

 
 
Send oss ​​en e-post kl hei@gretel.ai eller bli med oss Slack hvis du har spørsmål eller kommentarer. Vi vil gjerne høre hvordan du bruker Airflow og hvordan vi best kan integreres med eksisterende datarørledninger.

 
Bio: Drew Newberry er programvareingeniør hos Gretel.ai.

original. Ompostet med tillatelse.

Relatert:

Kilde: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Tidstempel:

Mer fra KDnuggets