Byg en syntetisk datapipeline ved hjælp af Gretel og Apache Airflow

Kildeknude: 1068200

Byg en syntetisk datapipeline ved hjælp af Gretel og Apache Airflow

I dette blogindlæg bygger vi en ETL-pipeline, der genererer syntetiske data fra en PostgreSQL-database ved hjælp af Gretels Synthetic Data API'er og Apache Airflow.


By Drew Newberry, Software Engineer hos Gretel.ai

Byg en syntetisk datapipeline ved hjælp af Gretel og Apache Airflow

Hej folkens, mit navn er Drew, og jeg er softwareingeniør her hos Gretel. Jeg har for nylig tænkt på mønstre til at integrere Gretel API'er i eksisterende værktøjer, så det er nemt at bygge datapipelines, hvor sikkerhed og kundernes privatliv er førsteklasses funktioner, ikke bare en eftertanke eller boks, der skal tjekkes.

Et dataingeniørværktøj, der er populært blandt Gretel-ingeniører og kunder, er Apache Airflow. Det fungerer tilfældigvis også godt med Gretel. I dette blogindlæg viser vi dig, hvordan du bygger en syntetisk datapipeline ved hjælp af Airflow, Gretel og PostgreSQL. Lad os springe ind!

Hvad er luftstrøm

 
 
Luftmængde er et workflow-automatiseringsværktøj, der almindeligvis bruges til at bygge datapipelines. Det gør det muligt for dataingeniører eller dataforskere at programmere definere og implementere disse pipelines ved hjælp af Python og andre velkendte konstruktioner. Kernen i Airflow er konceptet med en DAG, eller rettet acyklisk graf. En Airflow DAG giver en model og et sæt af API'er til at definere pipeline-komponenter, deres afhængigheder og udførelsesrækkefølge.

Du kan finde Airflow-pipelines, der replikerer data fra en produktdatabase til et datavarehus. Andre pipelines kan udføre forespørgsler, der forbinder normaliserede data til et enkelt datasæt, der er egnet til analyser eller modellering. Endnu en pipeline kan udgive en daglig rapport, der samler vigtige forretningsdata. Et fælles tema deles blandt disse use cases: koordinering af bevægelsen af ​​data på tværs af systemer. Det er her Airflow skinner.

Udnyttelse af Airflow og dets rige økosystem af integrationer, kan dataingeniører og videnskabsmænd orkestrere et vilkårligt antal forskellige værktøjer eller tjenester i en enkelt samlet pipeline, der er nem at vedligeholde og betjene. Med en forståelse af disse integrationsmuligheder vil vi nu begynde at tale om, hvordan Gretel kan integreres i en Airflow-pipeline for at forbedre almindelige dataoperations-arbejdsgange.

Hvordan passer Grete ind?

 
 
Hos Gretel er vores mission at gøre data nemmere og sikrere at arbejde med. Når vi taler med kunder, er et smertepunkt, vi ofte hører om, den tid og indsats, der kræves for at få dataforskere adgang til følsomme data. Ved brug af Gretel Syntetisk, kan vi reducere risikoen for at arbejde med følsomme data ved at generere en syntetisk kopi af datasættet. Ved at integrere Gretel med Airflow er det muligt at skabe selvbetjente pipelines, der gør det nemt for dataforskere hurtigt at få de data, de har brug for, uden at kræve en dataingeniør for hver ny dataanmodning.

For at demonstrere disse muligheder bygger vi en ETL-pipeline, der udtrækker brugeraktivitetsfunktioner fra en database, genererer en syntetisk version af datasættet og gemmer datasættet til S3. Med det syntetiske datasæt gemt i S3, kan det derefter bruges af dataforskere til downstream-modellering eller analyse uden at kompromittere kundernes privatliv.

For at sætte gang i tingene, lad os først tage et fugleperspektiv af rørledningen. Hver node i dette diagram repræsenterer et pipeline-trin eller "opgave" i luftstrømstermer.



Eksempel Gretel syntetisk pipeline på Airflow.

 

Vi kan dele pipelinen op i 3 faser, svarende til hvad du kan finde i en ETL pipeline:

  • Uddrag – Opgaven extract_features vil forespørge i en database og transformere dataene til et sæt funktioner, der kan bruges af dataforskere til at bygge modeller.
  • syntetisere – gener_synthetic_features vil tage de ekstraherede funktioner som input, træne en syntetisk model og derefter generere et syntetisk sæt funktioner ved hjælp af Gretel API'er og cloud-tjenester.
  • Load – upload_synthetic_features gemmer det syntetiske sæt funktioner til S3, hvor det kan indarbejdes i enhver downstream-model eller analyse.

I de næste par afsnit vil vi dykke ned i hvert af disse tre trin mere detaljeret. Hvis du ønsker at følge med i hver kodeeksempel, kan du gå over til gretelai/gretel-luftstrømsrørledninger og download al koden brugt i dette blogindlæg. Repoen indeholder også instruktioner, du kan følge for at starte en Airflow-instans og køre rørledningen fra ende til anden.

Derudover kan det være nyttigt at se Airflow-rørledningen i sin helhed, før vi dissekerer hver komponent, dags/airbnb_user_bookings.py. Kodestykkerne i de følgende sektioner er udtrukket fra den linkede brugerbookingspipeline.

Uddrag funktioner

 
 
Den første opgave, extract_features, er ansvarlig for at udtrække rådata fra kildedatabasen og transformere dem til et sæt funktioner. Dette er en almindelig funktionsteknik problem, du kan finde i enhver maskinlærings- eller analysepipeline.

I vores eksempelpipeline vil vi klargøre en PostgreSQL-database og indlæse den med bookingdata fra en Airbnb Kaggle-konkurrence.

Dette datasæt indeholder to tabeller, Brugere og Sessioner. Sessioner indeholder en fremmednøglereference, user_id. Ved at bruge dette forhold opretter vi et sæt funktioner, der indeholder forskellige booking-metrics samlet efter bruger. Følgende figur repræsenterer den SQL-forespørgsel, der bruges til at bygge funktionerne.

MED session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max_secsdssds)min. (secs_elapsed)) AS min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds ON . id = s.user_id LIMIT 5000


SQL-forespørgslen udføres derefter fra vores Airflow-pipeline og skrives til en mellemliggende S3-placering ved hjælp af følgende opgavedefinition.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Sti(sql_file).read_text() nøgle = f"{context['dag_run'].run_id}_booking_features.csv" med NamedTemporaryFi (mode="r+", suffix=".csv") som tmp_csv: postgres.copy_expert( f"copy ({sql_query}) til stdout med csv header", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, nøgle=nøgle, ) returnøgle


Inputtet til opgaven, sql_file, bestemmer, hvilken forespørgsel der skal køres på databasen. Denne forespørgsel vil blive læst ind til opgaven og derefter udført mod databasen. Resultaterne af forespørgslen vil derefter blive skrevet til S3, og fjernfilnøglen vil blive returneret som et output af opgaven.

Skærmbilledet nedenfor viser et eksempel på et resultatsæt af ekstraktionsforespørgslen fra oven. Vi vil beskrive, hvordan man opretter en syntetisk version af dette datasæt i næste afsnit.



Forhåndsvisning af forespørgselsresultat.

Syntetiser funktioner ved hjælp af Gretel API'er

 
 
For at generere en syntetisk version af hver funktion skal vi først træne en syntetisk model og derefter køre modellen for at generere syntetiske poster. Gretel har et sæt Python SDK'er, der gør det nemt at integrere i Airflow-opgaver.

Ud over Python Client SDK'erne har vi lavet en Gretel luftstrømskrog der administrerer Gretel API-forbindelser og hemmeligheder. Efter opsætning af en Gretel Airflow Connection er det lige så nemt at oprette forbindelse til Gretel API

fra hooks.gretel import GretelHook gretel = GretelHook() project = gretel.get_project()


For mere information om, hvordan du konfigurerer Airflow-forbindelser, se venligst vores Github-lager README.

Projektvariablen i eksemplet ovenfor kan bruges som hovedindgangspunktet for træning og kørsel af syntetiske modeller ved hjælp af Gretels API. For flere detaljer, kan du tjekke vores Python API-dokumenter.

Med henvisning tilbage til reservationspipelinen vil vi nu gennemgå opgaven gener_synthetic_features. Dette trin er ansvarligt for at træne den syntetiske model ved hjælp af funktionerne udtrukket i den forrige opgave.

@task() def gener_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) returner model.get_artifact_link("data_preview")


Når du ser på metodesignaturen, vil du se, at den tager en sti, data_source. Denne værdi peger på S3-funktionerne, der blev udtrukket i det foregående trin. I et senere afsnit vil vi gennemgå, hvordan alle disse input og output er forbundet med hinanden.

Når du opretter modellen ved hjælp af project.create_model_obj, repræsenterer model_config-parameteren den syntetiske modelkonfiguration, der bruges til at generere modellen. I denne pipeline bruger vi vores standard model konfiguration, men mange andre konfigurationsmuligheder er tilgængelige.

Efter at modellen er blevet konfigureret, kalder vi model.submit_cloud(). Dette vil indsende modellen til træning og rekordgenerering ved hjælp af Gretel Cloud. Kaldning af poll(model) vil blokere opgaven, indtil modellen har gennemført træning.

Nu hvor modellen er blevet trænet, bruger vi get_artifact_link til at returnere et link for at downloade de genererede syntetiske funktioner.



Dataforhåndsvisning af det syntetiske sæt funktioner.

 

Dette artefaktlink vil blive brugt som input til det sidste upload_synthetic_features-trin.

Indlæs syntetiske funktioner

 
 
De originale funktioner er blevet udtrukket, og en syntetisk version er blevet oprettet. Nu er det tid til at uploade de syntetiske funktioner, så de kan tilgås af downstream-forbrugere. I dette eksempel vil vi bruge en S3-spand som den endelige destination for datasættet.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() med open(data_set, "rb") som synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


Denne opgave er ret ligetil. Data_set-inputværdien indeholder et signeret HTTP-link til at downloade det syntetiske datasæt fra Gretels API. Opgaven vil læse den fil ind i Airflow-arbejderen og derefter bruge den allerede konfigurerede S3-hook til at uploade den syntetiske funktionsfil til en S3-bøtte, hvor downstream-forbrugere eller modeller kan få adgang til den.

Orkestrering af rørledningen

 
 
I løbet af de sidste tre sektioner har vi gennemgået al den kode, der kræves for at udtrække, syntetisere og indlæse et datasæt. Det sidste trin er at binde hver af disse opgaver sammen til en enkelt Airflow-pipeline.

Hvis du husker tilbage til begyndelsen af ​​dette indlæg, nævnte vi kort konceptet med en DAG. Ved at bruge Airflows TaskFlow API kan vi sammensætte disse tre Python-metoder til en DAG, der definerer input, output og rækkefølge, hvert trin vil blive kørt.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = gener_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Hvis du følger stien til disse metodekald, vil du til sidst få en graf, der ligner vores originale feature-pipeline.



Gretel syntetisk pipeline på Airflow.

 

Hvis du vil køre denne pipeline og se den i aktion, skal du gå over til medfølgende Github-depot. Der vil du finde instruktioner om, hvordan du starter en Airflow-instans og kører rørledningen fra ende til anden.

Indpakning ting op

 
 
Hvis du er nået så langt, har du set, hvordan Gretel kan integreres i en datapipeline bygget på Airflow. Ved at kombinere Gretels udviklervenlige API'er og Airflows kraftfulde system af kroge og operatører er det nemt at bygge ETL-pipelines, der gør data mere tilgængelige og sikrere at bruge.

Vi talte også om en almindelig funktionsteknik, hvor følsomme data muligvis ikke er let tilgængelige. Ved at generere en syntetisk version af datasættet reducerer vi risikoen for at afsløre eventuelle følsomme data, men vi bevarer stadig datasættets anvendelighed, samtidig med at vi gør det hurtigt tilgængeligt for dem, der har brug for det.

Når vi tænker på funktionspipelinen i mere abstrakte termer, har vi nu et mønster, der kan genbruges til et hvilket som helst antal nye SQL-forespørgsler. Ved at implementere en ny version af pipelinen og udskifte den indledende SQL-forespørgsel, kan vi fronte enhver potentielt følsom forespørgsel med et syntetisk datasæt, der bevarer kundernes privatliv. Den eneste kodelinje, der skal ændres, er stien til sql-filen. Ingen kompleks datateknik påkrævet.

Tak for læsning

 
 
Send os en e-mail kl hej@gretel.ai eller kom og vær med Slack hvis du har spørgsmål eller kommentarer. Vi vil meget gerne høre, hvordan du bruger Airflow, og hvordan vi bedst kan integrere med dine eksisterende datapipelines.

 
Bio: Drew Newberry er softwareingeniør hos Gretel.ai.

Original. Genopslået med tilladelse.

Relateret:

Kilde: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Tidsstempel:

Mere fra KDnuggets