Bouw een synthetische datapijplijn met Gretel en Apache Airflow

Bronknooppunt: 1068200

Bouw een synthetische datapijplijn met Gretel en Apache Airflow

In deze blogpost bouwen we een ETL-pijplijn die synthetische gegevens genereert uit een PostgreSQL-database met behulp van Gretel's Synthetic Data API's en Apache Airflow.


By Dre Newberry, Software Engineer bij Gretel.ai

Bouw een synthetische datapijplijn met Gretel en Apache Airflow

Hey mensen, mijn naam is Drew, en ik ben een software engineer hier bij Gretel. Ik heb onlangs nagedacht over patronen voor het integreren van Gretel-API's in bestaande tools, zodat het gemakkelijk is om datapijplijnen te bouwen waarin beveiliging en klantprivacy eersteklas functies zijn, niet alleen een bijzaak of vakje om te controleren.

Een tool voor data-engineering die populair is onder Gretel-technici en klanten, is Apache Airflow. Het werkt ook geweldig met Gretel. In deze blogpost laten we je zien hoe je een synthetische datapijplijn bouwt met Airflow, Gretel en PostgreSQL. Laten we erin springen!

Wat is luchtstroom?

 
 
Luchtstroom is een tool voor workflowautomatisering die vaak wordt gebruikt om gegevenspijplijnen te bouwen. Het stelt data-ingenieurs of datawetenschappers in staat om deze pijplijnen programmatisch te definiëren en in te zetten met behulp van Python en andere bekende constructies. De kern van Airflow is het concept van een DAG, of gerichte acyclische grafiek. Een Airflow DAG biedt een model en een set API's voor het definiëren van pijplijncomponenten, hun afhankelijkheden en uitvoeringsvolgorde.

Mogelijk vindt u Airflow-pijplijnen die gegevens uit een productdatabase repliceren naar een datawarehouse. Andere pijplijnen kunnen query's uitvoeren die genormaliseerde gegevens samenvoegen tot een enkele gegevensset die geschikt is voor analyse of modellering. Nog een andere pijplijn zou een dagelijks rapport kunnen publiceren met belangrijke bedrijfsstatistieken. Een gemeenschappelijk thema dat door deze use-cases wordt gedeeld: het coördineren van de verplaatsing van gegevens tussen systemen. Dit is waar Airflow uitblinkt.

Gebruikmakend van Airflow en zijn rijke ecosysteem van integraties, kunnen data-ingenieurs en wetenschappers een willekeurig aantal ongelijksoortige tools of services orkestreren in een enkele uniforme pijplijn die gemakkelijk te onderhouden en te bedienen is. Nu we deze integratiemogelijkheden begrijpen, gaan we het nu hebben over hoe Gretel kan worden geïntegreerd in een Airflow-pijplijn om algemene data-ops-workflows te verbeteren.

Hoe past Grietje erin?

 
 
Bij Gretel is het onze missie om gegevens gemakkelijker en veiliger te maken om mee te werken. Als we met klanten praten, is een pijnpunt waar we vaak over horen de tijd en moeite die nodig is om datawetenschappers toegang te krijgen tot gevoelige gegevens. Gebruik makend van Grietje Synthetisch, kunnen we het risico van het werken met gevoelige gegevens verminderen door een synthetische kopie van de dataset te genereren. Door Gretel te integreren met Airflow, is het mogelijk om zelfbedieningspijplijnen te creëren die het voor datawetenschappers gemakkelijk maken om snel de data te krijgen die ze nodig hebben, zonder dat er een data-engineer nodig is voor elk nieuw dataverzoek.

Om deze mogelijkheden te demonstreren, bouwen we een ETL-pijplijn die functies voor gebruikersactiviteit extraheert uit een database, een synthetische versie van de dataset genereert en de dataset opslaat in S3. Met de synthetische dataset die is opgeslagen in S3, kan deze vervolgens door datawetenschappers worden gebruikt voor downstream-modellering of analyse zonder de privacy van de klant in gevaar te brengen.

Laten we, om te beginnen, eerst de pijpleiding in vogelvlucht bekijken. Elk knooppunt in dit diagram vertegenwoordigt een pijplijnstap, of "taak" in termen van Airflow.



Voorbeeld Gretel kunststof pijpleiding op Airflow.

 

We kunnen de pijplijn opdelen in 3 fasen, vergelijkbaar met wat je zou kunnen vinden in een ETL-pijplijn:

  • Extract – De taak extract_features zal een database doorzoeken en de gegevens omzetten in een reeks functies die door gegevenswetenschappers kunnen worden gebruikt voor het bouwen van modellen.
  • Synthetiseren – Genereer_synthetic_features zal de geëxtraheerde functies als invoer gebruiken, een synthetisch model trainen en vervolgens een synthetische set functies genereren met behulp van Gretel API's en cloudservices.
  • Laden - upload_synthetic_features slaat de synthetische set functies op naar S3 waar het kan worden opgenomen in elk downstream-model of elke analyse.

In de volgende paragrafen gaan we dieper in op elk van deze drie stappen. Als je elk codevoorbeeld wilt volgen, ga dan naar: gretelai/gretel-luchtstroom-pijpleidingen en download alle code die in deze blogpost wordt gebruikt. De repo bevat ook instructies die u kunt volgen om een ​​Airflow-instantie te starten en de pijplijn van begin tot eind uit te voeren.

Bovendien kan het handig zijn om de Airflow-pijpleiding in zijn geheel te bekijken, voordat we elk onderdeel ontleden, dags/airbnb_user_bookings.py. De codefragmenten in de volgende secties worden geëxtraheerd uit de gekoppelde gebruikersboekingspijplijn.

Functies extraheren

 
 
De eerste taak, extract_features, is verantwoordelijk voor het extraheren van onbewerkte gegevens uit de brondatabase en het omzetten ervan in een reeks functies. Dit is een veel voorkomende functie-engineering probleem dat u kunt tegenkomen in een machine learning- of analysepijplijn.

In onze voorbeeldpijplijn zullen we een PostgreSQL-database inrichten en deze laden met boekingsgegevens van een Airbnb Kaggle-wedstrijd.

Deze dataset bevat twee tabellen, Gebruikers en Sessies. Sessions bevat een refererende sleutelreferentie, user_id. Met behulp van deze relatie zullen we een set functies maken met verschillende boekingsstatistieken die zijn verzameld per gebruiker. De volgende afbeelding geeft de SQL-query weer die is gebruikt om de functies te bouwen.

WITH session_features_by_user AS (SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)_time_time_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)_time_time_elapsed)) AS max_selapses min_session_time_seconds, ( SELECT count(*) FROM session s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM session GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .taal, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds FROM session_features u_by JO_IN .ID gebruikers


De SQL-query wordt vervolgens uitgevoerd vanuit onze Airflow-pijplijn en geschreven naar een tussenliggende S3-locatie met behulp van de volgende taakdefinitie.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" met NamedTemporaryFile (mode="r+", suffix=".csv") als tmp_csv: postgres.copy_expert( f"copy ({sql_query}) naar stdout met csv-header", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, key=key, ) return key


De invoer voor de taak, sql_file, bepaalt welke query moet worden uitgevoerd op de database. Deze query wordt ingelezen in de taak en vervolgens uitgevoerd op de database. De resultaten van de query worden vervolgens naar S3 geschreven en de externe bestandssleutel wordt geretourneerd als uitvoer van de taak.

De onderstaande schermafbeelding toont een voorbeeldresultatenset van de extractiequery van bovenaf. In de volgende sectie zullen we beschrijven hoe u een synthetische versie van deze dataset kunt maken.



Voorbeeld van queryresultaat.

Functies synthetiseren met behulp van Gretel API's

 
 
Om een ​​synthetische versie van elk kenmerk te genereren, moeten we eerst een synthetisch model trainen en vervolgens het model uitvoeren om synthetische records te genereren. Gretel heeft een set Python SDK's die het gemakkelijk maken om te integreren in Airflow-taken.

Naast de Python Client SDK's hebben we een Grietje Luchtstroom Haak die Gretel API-verbindingen en geheimen beheert. Na het opzetten van een Gretel Airflow Connection, is verbinding maken met de Gretel API net zo eenvoudig als:

from hooks.gretel import GretelHook gretel = GretelHook() project = gretel.get_project()


Raadpleeg onze Github-repository voor meer informatie over het configureren van Airflow-verbindingen README.

De projectvariabele in het bovenstaande voorbeeld kan worden gebruikt als het belangrijkste toegangspunt voor het trainen en uitvoeren van synthetische modellen met behulp van de API van Gretel. Voor meer details, kunt u onze Python API-documenten.

We verwijzen terug naar de boekingspijplijn en bekijken nu de taak create_synthetic_features. Deze stap is verantwoordelijk voor het trainen van het synthetische model met behulp van de functies die in de vorige taak zijn geëxtraheerd.

@task() def generation_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) retourneer model.get_artifact_link("data_preview")


Als u naar de handtekening van de methode kijkt, ziet u dat deze een pad nodig heeft, data_source. Deze waarde verwijst naar de S3-functies die in de vorige stap zijn geëxtraheerd. In een later gedeelte zullen we doornemen hoe al deze in- en uitgangen met elkaar zijn verbonden.

Bij het maken van het model met project.create_model_obj, vertegenwoordigt de param model_config de synthetische modelconfiguratie die wordt gebruikt om het model te genereren. In deze pijplijn gebruiken we onze standaard modelconfiguratie, maar vele andere configuratie-opties beschikbaar.

Nadat het model is geconfigureerd, noemen we model.submit_cloud(). Hiermee wordt het model ingediend voor training en het genereren van records met behulp van Gretel Cloud. Door poll(model) aan te roepen, wordt de taak geblokkeerd totdat het model de training heeft voltooid.

Nu het model is getraind, gebruiken we get_artifact_link om een ​​link te retourneren om de gegenereerde synthetische functies te downloaden.



Gegevensvoorbeeld van de synthetische set functies.

 

Deze artefactlink wordt gebruikt als invoer voor de laatste stap upload_synthetic_features.

Synthetische functies laden

 
 
De originele kenmerken zijn eruit gehaald en er is een synthetische versie gemaakt. Nu is het tijd om de synthetische functies te uploaden, zodat ze toegankelijk zijn voor downstream-consumenten. In dit voorbeeld gaan we een S3-bucket gebruiken als eindbestemming voor de dataset.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() met open(data_set, "rb") als synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


Deze taak is vrij eenvoudig. De invoerwaarde data_set bevat een ondertekende HTTP-link om de synthetische dataset te downloaden van de API van Gretel. De taak zal dat bestand inlezen in de Airflow-werker en vervolgens de reeds geconfigureerde S3-hook gebruiken om het synthetische functiebestand te uploaden naar een S3-bucket waar downstream-consumenten of -modellen er toegang toe hebben.

De pijplijn orkestreren

 
 
In de laatste drie secties hebben we alle code doorgenomen die nodig is om een ​​dataset te extraheren, synthetiseren en laden. De laatste stap is om elk van deze taken samen te voegen tot een enkele Airflow-pijpleiding.

Als je je aan het begin van dit bericht herinnert, hebben we het concept van een DAG kort genoemd. Met behulp van Airflow's TaskFlow API kunnen we deze drie Python-methoden samenstellen in een DAG die de invoer, uitvoer en volgorde definieert waarin elke stap wordt uitgevoerd.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = generation_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Als u het pad van deze methodeaanroepen volgt, krijgt u uiteindelijk een grafiek die eruitziet als onze oorspronkelijke functiepijplijn.



Gretel synthetische pijpleiding op Airflow.

 

Als je deze pijplijn wilt gebruiken en hem in actie wilt zien, ga dan naar de bijbehorende Github-repository. Daar vindt u instructies voor het starten van een Airflow-instantie en het van begin tot eind uitvoeren van de pijplijn.

De dingen inpakken

 
 
Als je zo ver bent gekomen, heb je gezien hoe Gretel kan worden geïntegreerd in een datapijplijn die op Airflow is gebouwd. Door de ontwikkelaarsvriendelijke API's van Gretel en het krachtige systeem van hooks en operators van Airflow te combineren, is het eenvoudig om ETL-pijplijnen te bouwen die gegevens toegankelijker en veiliger in gebruik maken.

We hebben ook gesproken over een veelvoorkomend gebruiksscenario voor feature-engineering waarbij gevoelige gegevens mogelijk niet gemakkelijk toegankelijk zijn. Door een synthetische versie van de dataset te genereren, verkleinen we het risico dat gevoelige gegevens worden vrijgegeven, maar behouden we het nut van de dataset en maken we deze snel beschikbaar voor degenen die deze nodig hebben.

Als we in meer abstracte termen nadenken over de functiepijplijn, hebben we nu een patroon dat kan worden hergebruikt voor een willekeurig aantal nieuwe SQL-query's. Door een nieuwe versie van de pijplijn te implementeren en de eerste SQL-query uit te wisselen, kunnen we elke potentieel gevoelige query voorzien van een synthetische dataset die de privacy van de klant behoudt. De enige regel code die moet worden gewijzigd, is het pad naar het sql-bestand. Geen complexe data-engineering vereist.

Bedankt voor het lezen

 
 
Stuur ons een e-mail op hallo@gretel.ai of kom erbij Slack als u vragen of opmerkingen heeft. We horen graag hoe je Airflow gebruikt en hoe we het beste kunnen integreren met je bestaande datapijplijnen.

 
Bio: Dre Newberry is een Software Engineer bij Gretel.ai.

ORIGINELE. Met toestemming opnieuw gepost.

Zie ook:

Bron: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Tijdstempel:

Meer van KDnuggets