Bygg en syntetisk datapipeline med Gretel och Apache Airflow

Källnod: 1068200

Bygg en syntetisk datapipeline med Gretel och Apache Airflow

I det här blogginlägget bygger vi en ETL-pipeline som genererar syntetisk data från en PostgreSQL-databas med hjälp av Gretels Synthetic Data APIs och Apache Airflow.


By Drew Newberry, mjukvaruingenjör på Gretel.ai

Bygg en syntetisk datapipeline med Gretel och Apache Airflow

Hej gott folk, jag heter Drew och jag är mjukvaruingenjör här på Gretel. Jag har nyligen funderat på mönster för att integrera Gretels API:er i befintliga verktyg så att det är enkelt att bygga datapipelines där säkerhet och kundernas integritet är förstklassiga funktioner, inte bara en eftertanke eller ruta att kontrollera.

Ett datateknikverktyg som är populärt bland Gretels ingenjörer och kunder är Apache Airflow. Det råkar också fungera utmärkt med Gretel. I det här blogginlägget visar vi dig hur du bygger en syntetisk datapipeline med Airflow, Gretel och PostgreSQL. Låt oss hoppa in!

Vad är luftflöde

 
 
Luftflöde är ett verktyg för automatisering av arbetsflöden som vanligtvis används för att bygga datapipelines. Det gör det möjligt för dataingenjörer eller datavetare att programmerat definiera och distribuera dessa pipelines med Python och andra välbekanta konstruktioner. Kärnan i Airflow är konceptet med en DAG, eller riktad acyklisk graf. En Airflow DAG tillhandahåller en modell och en uppsättning API:er för att definiera pipelinekomponenter, deras beroenden och exekveringsordning.

Du kan hitta Airflow-pipelines som replikerar data från en produktdatabas till ett datalager. Andra pipelines kan exekvera frågor som sammanfogar normaliserade data till en enda datauppsättning lämplig för analys eller modellering. Ytterligare en pipeline kan publicera en daglig rapport som sammanställer viktiga affärsmått. Ett gemensamt tema som delas bland dessa användningsfall: samordning av rörelsen av data över system. Det är här Airflow lyser.

Utnyttja luftflödet och dess rika ekosystem av integrationer, kan dataingenjörer och forskare orkestrera valfritt antal olika verktyg eller tjänster i en enda enhetlig pipeline som är lätt att underhålla och använda. Med en förståelse för dessa integrationsmöjligheter kommer vi nu att börja prata om hur Gretel kan integreras i en Airflow-pipeline för att förbättra vanliga dataoperationsarbetsflöden.

Hur passar Gretel in?

 
 
På Gretel är vårt uppdrag att göra data enklare och säkrare att arbeta med. När vi pratar med kunder är en smärtpunkt vi ofta hör om den tid och ansträngning som krävs för att få datavetare tillgång till känslig data. Använder sig av Gretel Syntet, kan vi minska risken för att arbeta med känslig data genom att generera en syntetisk kopia av datamängden. Genom att integrera Gretel med Airflow är det möjligt att skapa självbetjäningspipelines som gör det enkelt för datavetare att snabbt få den data de behöver utan att behöva en dataingenjör för varje ny dataförfrågan.

För att demonstrera dessa funktioner kommer vi att bygga en ETL-pipeline som extraherar användaraktivitetsfunktioner från en databas, genererar en syntetisk version av datamängden och sparar datasetet till S3. Med den syntetiska datamängden sparad i S3 kan den sedan användas av datavetare för nedströmsmodellering eller analys utan att äventyra kundernas integritet.

För att sätta igång, låt oss först ta ett fågelperspektiv av rörledningen. Varje nod i detta diagram representerar ett pipelinesteg, eller "uppgift" i luftflödestermer.



Exempel på Gretel syntetisk pipeline på Airflow.

 

Vi kan dela upp pipelinen i tre steg, liknande vad du kan hitta i en ETL-pipeline:

  • Utdrag – Uppgiften extract_features kommer att fråga efter en databas och omvandla data till en uppsättning funktioner som kan användas av dataforskare för att bygga modeller.
  • syntetisera – gener_synthetic_features kommer att ta de extraherade funktionerna som input, träna en syntetisk modell och sedan generera en syntetisk uppsättning funktioner med hjälp av Gretel API:er och molntjänster.
  • Ladda – upload_synthetic_features sparar den syntetiska uppsättningen funktioner till S3 där den kan tas in i valfri nedströmsmodell eller analys.

I de kommande avsnitten kommer vi att dyka in i vart och ett av dessa tre steg mer detaljerat. Om du vill följa med varje kodexempel kan du gå till gretelai/gretel-luftflödesrörledningar och ladda ner all kod som används i det här blogginlägget. Repet innehåller också instruktioner som du kan följa för att starta en Airflow-instans och köra rörledningen från början till slut.

Dessutom kan det vara bra att se luftflödesledningen i sin helhet innan vi dissekerar varje komponent, dags/airbnb_user_bookings.py. Kodavsnitten i följande avsnitt extraheras från den länkade användarbokningspipelinen.

Extrahera funktioner

 
 
Den första uppgiften, extract_features, är ansvarig för att extrahera rådata från källdatabasen och omvandla den till en uppsättning funktioner. Detta är en vanlig funktionsteknik problem du kan hitta i vilken maskininlärnings- eller analyspipeline som helst.

I vår exempelpipeline kommer vi att tillhandahålla en PostgreSQL-databas och ladda den med bokningsdata från en Airbnb Kaggle-tävling.

Denna datauppsättning innehåller två tabeller, användare och sessioner. Sessioner innehåller en främmande nyckelreferens, user_id. Genom att använda denna relation skapar vi en uppsättning funktioner som innehåller olika bokningsstatistik sammanställda av användare. Följande figur representerar SQL-frågan som används för att bygga funktionerna.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max_secseds)sessions(secs_sessions)min min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds FROM uer 5000


SQL-frågan exekveras sedan från vår Airflow-pipeline och skrivs till en mellanliggande S3-plats med hjälp av följande uppgiftsdefinition.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() nyckel = f"{context['dag_run'].run_id}_booking_features.csv" med NamedTemporaryFi (mode="r+", suffix=".csv") som tmp_csv: postgres.copy_expert( f"copy ({sql_query}) till stdout med csv header", tmp_csv.name ) s3.load_file( filnamn=tmp_csv.name, nyckel=nyckel, ) returnyckel


Indata till uppgiften, sql_file, bestämmer vilken fråga som ska köras på databasen. Denna fråga kommer att läsas in i uppgiften och exekveras sedan mot databasen. Resultaten av frågan kommer sedan att skrivas till S3 och fjärrfilnyckeln kommer att returneras som en utdata för uppgiften.

Skärmdumpen nedan visar ett exempel på resultatuppsättningen av extraktionsfrågan ovanifrån. Vi kommer att beskriva hur man skapar en syntetisk version av denna datauppsättning i nästa avsnitt.



Förhandsgranskning av frågeresultat.

Syntetisera funktioner med Gretel API:er

 
 
För att generera en syntetisk version av varje funktion måste vi först träna en syntetisk modell och sedan köra modellen för att generera syntetiska poster. Gretel har en uppsättning Python SDK:er som gör det enkelt att integrera i Airflow-uppgifter.

Utöver Python Client SDK har vi skapat en Gretel Airflow Hook som hanterar Gretel API-anslutningar och hemligheter. Efter att ha konfigurerat en Gretel Airflow Connection är det lika enkelt att ansluta till Gretel API

från hooks.gretel import GretelHook gretel = GretelHook() project = gretel.get_project()


För mer information om hur du konfigurerar Airflow-anslutningar, se vårt Github-förråd README.

Projektvariabeln i exemplet ovan kan användas som huvudstartpunkt för träning och körning av syntetiska modeller med hjälp av Gretels API. För mer information kan du kolla in vår Python API-dokument.

Med hänvisning till bokningspipelinen kommer vi nu att granska uppgiften gener_synthetic_features. Detta steg är ansvarigt för att träna den syntetiska modellen med hjälp av funktionerna som extraherades i föregående uppgift.

@task() def gener_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) return model.get_artifact_link("data_preview")


När du tittar på metodsignaturen kommer du att se att den tar en sökväg, data_source. Detta värde pekar på S3-funktionerna som extraherades i föregående steg. I ett senare avsnitt kommer vi att gå igenom hur alla dessa ingångar och utgångar kopplas samman.

När du skapar modellen med project.create_model_obj representerar parametern model_config den syntetiska modellkonfigurationen som används för att generera modellen. I denna pipeline använder vi vår standardmodellkonfiguration, men många andra konfigurationsalternativ finns tillgängliga.

Efter att modellen har konfigurerats anropar vi model.submit_cloud(). Detta kommer att skicka in modellen för träning och rekordgenerering med Gretel Cloud. Att anropa poll(model) kommer att blockera uppgiften tills modellen har slutfört utbildningen.

Nu när modellen har tränats kommer vi att använda get_artifact_link för att returnera en länk för att ladda ner de genererade syntetiska funktionerna.



Dataförhandsvisning av den syntetiska uppsättningen funktioner.

 

Denna artefaktlänk kommer att användas som en input till det sista upload_synthetic_features-steget.

Ladda syntetiska funktioner

 
 
De ursprungliga funktionerna har extraherats och en syntetisk version har skapats. Nu är det dags att ladda upp de syntetiska funktionerna så att de kan nås av nedströmskonsumenter. I det här exemplet kommer vi att använda en S3-hink som slutdestination för datamängden.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() med open(data_set, "rb") som synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


Denna uppgift är ganska enkel. Data_set-inmatningsvärdet innehåller en signerad HTTP-länk för att ladda ner den syntetiska datamängden från Gretels API. Uppgiften kommer att läsa in den filen i Airflow-arbetaren och sedan använda den redan konfigurerade S3-kroken för att ladda upp den syntetiska funktionsfilen till en S3-hink där nedströmskonsumenter eller modeller kan komma åt den.

Orkestera rörledningen

 
 
Under de senaste tre avsnitten har vi gått igenom all kod som krävs för att extrahera, syntetisera och ladda en datauppsättning. Det sista steget är att knyta samman var och en av dessa uppgifter till en enda Airflow-pipeline.

Om du kommer ihåg början av det här inlägget nämnde vi kort konceptet med en DAG. Med hjälp av Airflows TaskFlow API kan vi komponera dessa tre Python-metoder till en DAG som definierar ingångar, utgångar och ordning varje steg kommer att köras.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = generera_syntetiska_funktioner(funktionssökväg) upload_syntetiska_funktioner(syntetiska_data)


Om du följer vägen för dessa metodanrop kommer du så småningom att få en graf som ser ut som vår ursprungliga funktionspipeline.



Gretel syntetisk pipeline på Airflow.

 

Om du vill köra denna pipeline och se den i aktion, gå över till medföljande Github-förråd. Där hittar du instruktioner om hur du startar en Airflow-instans och kör pipeline från början till slut.

Förpackning saker upp

 
 
Om du har kommit så långt har du sett hur Gretel kan integreras i en datapipeline byggd på Airflow. Genom att kombinera Gretels utvecklarvänliga API:er och Airflows kraftfulla system av krokar och operatörer är det enkelt att bygga ETL-pipelines som gör data mer tillgänglig och säkrare att använda.

Vi pratade också om ett vanligt användningsfall där känsliga data kanske inte är lättillgängliga. Genom att generera en syntetisk version av datamängden minskar vi risken för att exponera eventuella känsliga uppgifter, men behåller ändå användbarheten av datasetet samtidigt som vi gör det snabbt tillgängligt för dem som behöver det.

När vi tänker på funktionspipelinen i mer abstrakta termer, har vi nu ett mönster som kan återanvändas för valfritt antal nya SQL-frågor. Genom att distribuera en ny version av pipelinen och byta ut den ursprungliga SQL-frågan kan vi fronta alla potentiellt känsliga frågor med en syntetisk datauppsättning som bevarar kundernas integritet. Den enda kodraden som behöver ändras är sökvägen till sql-filen. Ingen komplex datateknik krävs.

Tack för att du läser

 
 
Skicka ett mail till oss hej@gretel.ai eller kom med oss Slak om du har några frågor eller kommentarer. Vi vill gärna höra hur du använder Airflow och hur vi bäst kan integrera med dina befintliga datapipelines.

 
Bio: Drew Newberry är mjukvaruingenjör på Gretel.ai.

Ursprungliga. Skickas om med tillstånd.

Relaterat:

Källa: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Tidsstämpel:

Mer från KDnuggets