Erstellen Sie eine synthetische Datenpipeline mit Gretel und Apache Airflow

Quellknoten: 1068200

Erstellen Sie eine synthetische Datenpipeline mit Gretel und Apache Airflow

In diesem Blogbeitrag erstellen wir eine ETL-Pipeline, die mithilfe von Gretels Synthetic Data APIs und Apache Airflow synthetische Daten aus einer PostgreSQL-Datenbank generiert.


By Draw Newberry, Software-Ingenieur bei Gretel.ai

Erstellen Sie eine synthetische Datenpipeline mit Gretel und Apache Airflow

Hey Leute, mein Name ist Drew und ich bin Software-Ingenieur hier bei Gretel. Ich habe vor kurzem über Muster für die Integration von Gretel-APIs in bestehende Tools nachgedacht, damit es einfach ist, Datenpipelines aufzubauen, bei denen Sicherheit und Kundendatenschutz erstklassige Funktionen sind und nicht nur ein nachträglicher Gedanke oder ein Kästchen, das überprüft werden muss.

Ein bei Gretel-Ingenieuren und Kunden beliebtes Data-Engineering-Tool ist Apache Airflow. Mit Gretel funktioniert es auch super. In diesem Blogbeitrag zeigen wir Ihnen, wie Sie mit Airflow, Gretel und PostgreSQL eine synthetische Datenpipeline aufbauen. Lass uns einspringen!

Was ist Luftstrom?

 
 
Airflow ist ein Tool zur Workflow-Automatisierung, das häufig zum Erstellen von Datenpipelines verwendet wird. Es ermöglicht Dateningenieuren oder Datenwissenschaftlern, diese Pipelines mithilfe von Python und anderen bekannten Konstrukten programmgesteuert zu definieren und bereitzustellen. Der Kern von Airflow ist das Konzept eines DAG oder gerichteten azyklischen Graphen. Ein Airflow-DAG bietet ein Modell und eine Reihe von APIs zum Definieren von Pipelinekomponenten, ihrer Abhängigkeiten und der Ausführungsreihenfolge.

Möglicherweise finden Sie Airflow-Pipelines, die Daten aus einer Produktdatenbank in ein Data Warehouse replizieren. Andere Pipelines können Abfragen ausführen, die normalisierte Daten zu einem einzigen Dataset zusammenfügen, das für Analysen oder Modellierung geeignet ist. Eine weitere Pipeline könnte einen täglichen Bericht veröffentlichen, der wichtige Geschäftskennzahlen zusammenfasst. Ein gemeinsames Thema dieser Anwendungsfälle: die Koordination des Datenverkehrs zwischen Systemen. Hier glänzt Airflow.

Nutzung von Airflow und seinem reichhaltigen Ökosystem an Integrationenkönnen Dateningenieure und Wissenschaftler eine beliebige Anzahl unterschiedlicher Tools oder Dienste in einer einzigen einheitlichen Pipeline orchestrieren, die einfach zu warten und zu betreiben ist. Nachdem wir diese Integrationsfähigkeiten verstanden haben, werden wir jetzt darüber sprechen, wie Gretel in eine Airflow-Pipeline integriert werden könnte, um gängige Arbeitsabläufe für Datenoperationen zu verbessern.

Wie passt Gretel dazu?

 
 
Unsere Mission bei Gretel ist es, die Arbeit mit Daten einfacher und sicherer zu machen. Wenn wir mit Kunden sprechen, ist ein Schmerzpunkt, von dem wir oft hören, der Zeit- und Arbeitsaufwand, der erforderlich ist, um Datenwissenschaftlern Zugang zu sensiblen Daten zu verschaffen. Verwenden von Gretel Synthetik, können wir das Risiko der Arbeit mit sensiblen Daten reduzieren, indem wir eine synthetische Kopie des Datensatzes erstellen. Durch die Integration von Gretel mit Airflow ist es möglich, Self-Service-Pipelines zu erstellen, die es Datenwissenschaftlern leicht machen, die benötigten Daten schnell zu erhalten, ohne dass für jede neue Datenanforderung ein Dateningenieur erforderlich ist.

Um diese Funktionen zu demonstrieren, erstellen wir eine ETL-Pipeline, die Benutzeraktivitätsfunktionen aus einer Datenbank extrahiert, eine synthetische Version des Datasets generiert und das Dataset in S3 speichert. Der in S3 gespeicherte synthetische Datensatz kann dann von Data Scientists für nachgelagerte Modellierungen oder Analysen verwendet werden, ohne die Privatsphäre der Kunden zu beeinträchtigen.

Lassen Sie uns zunächst die Pipeline aus der Vogelperspektive betrachten. Jeder Knoten in diesem Diagramm stellt einen Pipeline-Schritt oder eine „Aufgabe“ in Airflow-Begriffen dar.



Beispiel einer Gretel-Kunststoffpipeline auf Airflow.

 

Wir können die Pipeline in drei Phasen aufteilen, ähnlich wie bei einer ETL-Pipeline:

  • Extrahieren – Die Aufgabe extrahieren_features fragt eine Datenbank ab und wandelt die Daten in eine Reihe von Features um, die von Data Scientists zum Erstellen von Modellen verwendet werden können.
  • Synthetisieren – generate_synthetic_features nimmt die extrahierten Features als Eingabe, trainiert ein synthetisches Modell und generiert dann mithilfe von Gretel-APIs und Cloud-Diensten einen synthetischen Satz von Features.
  • Es gibt einen Teil der ...Laden Sie – upload_synthetic_features speichert den synthetischen Satz von Features in S3, wo er in jedes nachgelagerte Modell oder jede Analyse aufgenommen werden kann.

In den nächsten Abschnitten werden wir uns jeden dieser drei Schritte genauer ansehen. Wenn Sie jedem Codebeispiel folgen möchten, können Sie zu . gehen gretelai/gretel-airflow-pipelines und laden Sie den gesamten Code herunter, der in diesem Blogbeitrag verwendet wird. Das Repository enthält auch Anweisungen, die Sie befolgen können, um eine Airflow-Instanz zu starten und die Pipeline Ende-zu-Ende auszuführen.

Darüber hinaus kann es hilfreich sein, die Airflow-Pipeline in ihrer Gesamtheit zu betrachten, bevor wir jede Komponente analysieren. dags/airbnb_user_bookings.py. Die Code-Snippets in den folgenden Abschnitten werden aus der verknüpften Benutzerbuchungspipeline extrahiert.

Extrahieren von Funktionen

 
 
Die erste Aufgabe, Extract_features, ist dafür verantwortlich, Rohdaten aus der Quelldatenbank zu extrahieren und in eine Reihe von Features umzuwandeln. Dies ist eine gemeinsame Feature Engineering Problem, das Sie in jeder Pipeline für maschinelles Lernen oder Analyse finden könnten.

In unserer Beispielpipeline stellen wir eine PostgreSQL-Datenbank bereit und laden sie mit Buchungsdaten aus einem Airbnb-Kaggle-Wettbewerb.

Dieses Dataset enthält zwei Tabellen, Benutzer und Sitzungen. Sessions enthält eine Fremdschlüsselreferenz, user_id. Anhand dieser Beziehung erstellen wir eine Reihe von Funktionen mit verschiedenen Buchungsmetriken, die nach Benutzer aggregiert werden. Die folgende Abbildung stellt die SQL-Abfrage dar, die zum Erstellen der Features verwendet wird.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed))(secs_sessione_lapsed(secs_session_min_time) min_session_time_seconds, ( SELECT count(*) FROM session s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM session GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds FROM session_features


Die SQL-Abfrage wird dann von unserer Airflow-Pipeline ausgeführt und mithilfe der folgenden Aufgabendefinition an einen S3-Zwischenspeicherort geschrieben.

@task() def Extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" mit NamedTemporaryFile (mode="r+", suffix=".csv") as tmp_csv: postgres.copy_expert( f"copy ({sql_query}) to stdout with csv header", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, key=key, ) Return-Taste


Die Eingabe für die Aufgabe, sql_file, bestimmt, welche Abfrage in der Datenbank ausgeführt werden soll. Diese Abfrage wird in die Aufgabe eingelesen und dann gegen die Datenbank ausgeführt. Die Ergebnisse der Abfrage werden dann in S3 geschrieben und der entfernte Dateischlüssel wird als Ausgabe der Aufgabe zurückgegeben.

Der Screenshot unten zeigt ein Beispielergebnissatz der Extraktionsabfrage von oben. Im nächsten Abschnitt wird beschrieben, wie Sie eine synthetische Version dieses Datensatzes erstellen.



Vorschau der Abfrageergebnisse.

Synthetisieren Sie Funktionen mit Gretel-APIs

 
 
Um eine synthetische Version jedes Features zu generieren, müssen wir zuerst ein synthetisches Modell trainieren und dann das Modell ausführen, um synthetische Datensätze zu generieren. Gretel verfügt über eine Reihe von Python-SDKs, die die Integration in Airflow-Aufgaben erleichtern.

Zusätzlich zu den Python-Client-SDKs haben wir eine Gretel Airflow-Haken die Gretel-API-Verbindungen und -Geheimnisse verwaltet. Nach dem Einrichten einer Gretel Airflow-Verbindung ist die Verbindung mit der Gretel-API so einfach wie

aus Hooks.gretel importieren GretelHook gretel = GretelHook() project = gretel.get_project()


Weitere Informationen zum Konfigurieren von Airflow-Verbindungen finden Sie in unserem Github-Repository README.

Die Projektvariable im obigen Beispiel kann als Haupteinstiegspunkt zum Trainieren und Ausführen synthetischer Modelle mit Gretels API verwendet werden. Weitere Informationen finden Sie in unserem Python-API-Dokumente.

In Bezug auf die Buchungspipeline werden wir nun die Aufgabe "generate_synthetic_features" überprüfen. Dieser Schritt ist für das Trainieren des synthetischen Modells unter Verwendung der in der vorherigen Aufgabe extrahierten Merkmale verantwortlich.

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) return model.get_artifact_link("data_preview")


Wenn Sie sich die Methodensignatur ansehen, sehen Sie, dass sie einen Pfad, data_source, verwendet. Dieser Wert verweist auf die im vorherigen Schritt extrahierten S3-Features. In einem späteren Abschnitt werden wir durchgehen, wie all diese Ein- und Ausgänge miteinander verdrahtet sind.

Beim Erstellen des Modells mit project.create_model_obj stellt der Parameter model_config die synthetische Modellkonfiguration dar, die zum Generieren des Modells verwendet wird. In dieser Pipeline verwenden wir unsere Standardmodellkonfiguration, aber viele andere Konfigurationsoptionen stehen zur Verfügung.

Nachdem das Modell konfiguriert wurde, rufen wir model.submit_cloud() auf. Dadurch wird das Modell für das Training und die Datensatzerstellung mit Gretel Cloud übermittelt. Der Aufruf von poll(model) blockiert die Aufgabe, bis das Modell das Training abgeschlossen hat.

Nachdem das Modell nun trainiert wurde, verwenden wir get_artifact_link, um einen Link zum Herunterladen der generierten synthetischen Features zurückzugeben.



Datenvorschau des synthetischen Funktionssatzes.

 

Dieser Artefaktlink wird als Eingabe für den abschließenden Schritt upload_synthetic_features verwendet.

Synthetische Funktionen laden

 
 
Die ursprünglichen Merkmale wurden extrahiert und eine synthetische Version wurde erstellt. Jetzt ist es an der Zeit, die synthetischen Funktionen hochzuladen, damit nachgeschaltete Verbraucher darauf zugreifen können. In diesem Beispiel verwenden wir einen S3-Bucket als endgültiges Ziel für das Dataset.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() with open(data_set, "rb") as synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


Diese Aufgabe ist ziemlich einfach. Der Eingabewert data_set enthält einen signierten HTTP-Link zum Herunterladen des synthetischen Datensatzes von Gretels API. Die Aufgabe liest diese Datei in den Airflow-Worker ein und verwendet dann den bereits konfigurierten S3-Hook, um die synthetische Feature-Datei in einen S3-Bucket hochzuladen, wo nachgeschaltete Verbraucher oder Modelle darauf zugreifen können.

Orchestrierung der Pipeline

 
 
In den letzten drei Abschnitten haben wir den gesamten Code durchlaufen, der zum Extrahieren, Synthetisieren und Laden eines Datensatzes erforderlich ist. Der letzte Schritt besteht darin, jede dieser Aufgaben zu einer einzigen Airflow-Pipeline zusammenzufassen.

Wenn Sie sich an den Anfang dieses Beitrags erinnern, haben wir kurz das Konzept eines DAG erwähnt. Mit der TaskFlow-API von Airflow können wir diese drei Python-Methoden zu einem DAG zusammensetzen, der die Eingaben, Ausgaben und die Reihenfolge definiert, in der jeder Schritt ausgeführt wird.

feature_path = Extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetisch_data = generate_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Wenn Sie dem Pfad dieser Methodenaufrufe folgen, erhalten Sie schließlich ein Diagramm, das wie unsere ursprüngliche Feature-Pipeline aussieht.



Gretel-Kunststoffpipeline auf Airflow.

 

Wenn Sie diese Pipeline ausführen und in Aktion sehen möchten, gehen Sie zum begleitendes Github-Repository. Dort finden Sie Anweisungen zum Starten einer Airflow-Instanz und zum Ausführen der Pipeline End-to-End.

Dinge einpacken

 
 
Wenn Sie es bis hierher geschafft haben, haben Sie gesehen, wie Gretel in eine auf Airflow basierende Datenpipeline integriert werden kann. Durch die Kombination der entwicklerfreundlichen APIs von Gretel und des leistungsstarken Systems von Hooks und Operatoren von Airflow ist es einfach, ETL-Pipelines zu erstellen, die Daten leichter zugänglich und sicherer machen.

Wir haben auch über einen allgemeinen Anwendungsfall im Feature-Engineering gesprochen, bei dem sensible Daten möglicherweise nicht ohne weiteres zugänglich sind. Durch die Generierung einer synthetischen Version des Datensatzes reduzieren wir das Risiko der Offenlegung sensibler Daten, behalten jedoch den Nutzen des Datensatzes bei und stellen ihn gleichzeitig denjenigen zur Verfügung, die ihn benötigen.

Betrachten wir die Feature-Pipeline in abstrakteren Begriffen, haben wir jetzt ein Muster, das für eine beliebige Anzahl neuer SQL-Abfragen umfunktioniert werden kann. Durch die Bereitstellung einer neuen Version der Pipeline und den Austausch der ursprünglichen SQL-Abfrage können wir jede potenziell sensible Abfrage mit einem synthetischen Dataset versehen, das die Privatsphäre der Kunden schützt. Die einzige Codezeile, die geändert werden muss, ist der Pfad zur SQL-Datei. Kein aufwändiges Data Engineering erforderlich.

Danke fürs Lesen

 
 
Senden Sie uns eine E-Mail an hallo@gretel.ai oder komm zu uns Slack wenn Sie Fragen oder Anmerkungen haben. Wir würden gerne erfahren, wie Sie Airflow verwenden und wie wir am besten in Ihre bestehenden Datenpipelines integrieren können.

 
Bio: Draw Newberry ist Software-Ingenieur bei Gretel.ai.

Original. Mit Genehmigung erneut veröffentlicht.

Related:

Quelle: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Zeitstempel:

Mehr von KDnuggets