Crea una pipeline di dati sintetici utilizzando Gretel e Apache Airflow

Nodo di origine: 1068200

Crea una pipeline di dati sintetici utilizzando Gretel e Apache Airflow

In questo post del blog, creiamo una pipeline ETL che genera dati sintetici da un database PostgreSQL utilizzando le API di dati sintetici di Gretel e Apache Airflow.


By Ha disegnato Newberry, Software Engineer presso Gretel.ai

Crea una pipeline di dati sintetici utilizzando Gretel e Apache Airflow

Ehi gente, mi chiamo Drew e sono un ingegnere del software qui alla Gretel. Di recente ho pensato a modelli per l'integrazione delle API Gretel negli strumenti esistenti in modo che sia facile creare pipeline di dati in cui la sicurezza e la privacy dei clienti sono caratteristiche di prima classe, non solo un ripensamento o una casella da controllare.

Uno strumento di ingegneria dei dati popolare tra gli ingegneri ei clienti Gretel è Apache Airflow. Capita anche di funzionare alla grande con Gretel. In questo post del blog, ti mostreremo come creare una pipeline di dati sintetici utilizzando Airflow, Gretel e PostgreSQL. Facciamo un salto!

Cos'è il flusso d'aria?

 
 
Flusso d'aria è uno strumento di automazione del flusso di lavoro comunemente utilizzato per creare pipeline di dati. Consente ai data engineer o ai data scientist di definire e distribuire in modo programmatico queste pipeline utilizzando Python e altri costrutti familiari. Al centro di Airflow c'è il concetto di DAG, o grafico aciclico diretto. Un DAG Airflow fornisce un modello e un set di API per definire i componenti della pipeline, le loro dipendenze e l'ordine di esecuzione.

Potresti trovare pipeline Airflow che replicano i dati da un database di prodotti in un data warehouse. Altre pipeline potrebbero eseguire query che uniscono dati normalizzati in un singolo set di dati adatto per l'analisi o la modellazione. Un'altra pipeline potrebbe pubblicare un report giornaliero che aggrega le metriche aziendali chiave. Un tema comune condiviso tra questi casi d'uso: coordinare il movimento dei dati tra i sistemi. È qui che brilla il flusso d'aria.

Sfruttando Airflow e il suo ricco ecosistema di integrazioni, i data engineer e gli scienziati possono orchestrare un numero qualsiasi di strumenti o servizi disparati in un'unica pipeline unificata di facile manutenzione e funzionamento. Con una comprensione di queste capacità di integrazione, inizieremo ora a parlare di come Gretel potrebbe essere integrato in una pipeline Airflow per migliorare i flussi di lavoro comuni delle operazioni sui dati.

Come si inserisce Gretel?

 
 
In Gretel, la nostra missione è rendere i dati più facili e sicuri con cui lavorare. Parlando con i clienti, un punto dolente di cui sentiamo spesso parlare è il tempo e lo sforzo necessari per consentire ai data scientist di accedere ai dati sensibili. Usando Gretel Sintetici, possiamo ridurre il rischio di lavorare con dati sensibili generando una copia sintetica del set di dati. Integrando Gretel con Airflow, è possibile creare pipeline self-service che consentono ai data scientist di ottenere rapidamente i dati di cui hanno bisogno senza richiedere un data engineer per ogni nuova richiesta di dati.

Per dimostrare queste capacità, creeremo una pipeline ETL che estrae le funzionalità dell'attività dell'utente da un database, genera una versione sintetica del set di dati e salva il set di dati in S3. Con il set di dati sintetico salvato in S3, può quindi essere utilizzato dai data scientist per la modellazione o l'analisi a valle senza compromettere la privacy dei clienti.

Per iniziare, diamo prima una veduta a volo d'uccello dell'oleodotto. Ogni nodo in questo diagramma rappresenta un passaggio della pipeline, o "attività" in termini di flusso d'aria.



Esempio di pipeline sintetica Gretel su Airflow.

 

Possiamo suddividere la pipeline in 3 fasi, in modo simile a quello che potresti trovare in una pipeline ETL:

  • Estratto – L'attività extract_features eseguirà una query su un database e trasformerà i dati in un insieme di funzionalità che possono essere utilizzate dai data scientist per la creazione di modelli.
  • Sintetizzare – generate_synthetic_features prenderà le funzionalità estratte come input, formerà un modello sintetico e quindi genererà un insieme sintetico di funzionalità utilizzando le API Gretel e i servizi cloud.
  • Caricare – upload_synthetic_features salva il set sintetico di funzionalità in S3 dove può essere inserito in qualsiasi modello o analisi a valle.

Nelle prossime sezioni analizzeremo ciascuno di questi tre passaggi in modo più dettagliato. Se desideri seguire ogni esempio di codice, puoi andare su gretelai/gretel-airflow-oleodotti e scarica tutto il codice utilizzato in questo post del blog. Il repository contiene anche le istruzioni che puoi seguire per avviare un'istanza Airflow ed eseguire la pipeline end-to-end.

Inoltre, potrebbe essere utile visualizzare la pipeline del flusso d'aria nella sua interezza, prima di sezionare ogni componente, dags/airbnb_user_bookings.py. I frammenti di codice nelle sezioni seguenti vengono estratti dalla pipeline di prenotazione dell'utente collegata.

Estrai caratteristiche

 
 
La prima attività, extract_features è responsabile dell'estrazione dei dati grezzi dal database di origine e della loro trasformazione in un insieme di funzionalità. Questo è un comune ingegneria delle caratteristiche problema che potresti riscontrare in qualsiasi pipeline di machine learning o analisi.

Nella nostra pipeline di esempio forniremo un database PostgreSQL e lo caricheremo con i dati di prenotazione da un Concorso Airbnb Kaggle.

Questo set di dati contiene due tabelle, Utenti e Sessioni. Sessions contiene un riferimento a una chiave esterna, user_id. Utilizzando questa relazione, creeremo una serie di funzionalità contenenti varie metriche di prenotazione aggregate per utente. La figura seguente rappresenta la query SQL utilizzata per creare le funzionalità.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS numero_di_azioni_intraprese, count(DISTINCT action_type) AS numero_di_azioni_uniche, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)_dseconds(secs_elapsed)) AS maxsec_session_time min_session_time_seconds, ( SELECT count(*) FROM sessioni s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessioni GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds DA u id user_LIMIT id 5000 utenti LEFT_LIMIT.


La query SQL viene quindi eseguita dalla nostra pipeline Airflow e scritta in una posizione S3 intermedia utilizzando la seguente definizione di attività.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" con NamedTemporaryFile (mode="r+", suffix=".csv") come tmp_csv: postgres.copy_expert( f"copy ({sql_query}) su stdout con intestazione csv", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, chiave=chiave, ) tasto di ritorno


L'input per l'attività, sql_file, determina quale query eseguire sul database. Questa query verrà letta nell'attività e quindi eseguita sul database. I risultati della query verranno quindi scritti in S3 e la chiave del file remoto verrà restituita come output dell'attività.

Lo screenshot seguente mostra un set di risultati di esempio della query di estrazione dall'alto. Descriveremo come creare una versione sintetica di questo set di dati nella sezione successiva.



Anteprima dei risultati della query.

Sintetizzare le funzionalità utilizzando le API di Gretel

 
 
Per generare una versione sintetica di ogni funzionalità, dobbiamo prima addestrare un modello sintetico, quindi eseguire il modello per generare record sintetici. Gretel ha una serie di SDK Python che facilitano l'integrazione nelle attività di Airflow.

Oltre ai Python Client SDK, abbiamo creato un Gancio del flusso d'aria Gretel che gestisce le connessioni e i segreti dell'API Gretel. Dopo aver impostato una connessione Gretel Airflow, connettersi all'API Gretel è facile come

da hooks.gretel import GretelHook gretel = GretelHook() project = gretel.get_project()


Per ulteriori informazioni su come configurare le connessioni Airflow, fare riferimento al nostro repository Github README.

La variabile di progetto nell'esempio precedente può essere utilizzata come punto di ingresso principale per l'addestramento e l'esecuzione di modelli sintetici utilizzando l'API di Gretel. Per maggiori dettagli, puoi controllare il nostro Documenti API Python.

Facendo riferimento alla pipeline di prenotazione, esamineremo ora l'attività generate_synthetic_features. Questo passaggio è responsabile dell'addestramento del modello sintetico utilizzando le funzionalità estratte nell'attività precedente.

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) restituisce model.get_artifact_link("data_preview")


Guardando la firma del metodo, vedrai che prende un percorso, data_source. Questo valore punta alle caratteristiche S3 estratte nel passaggio precedente. In una sezione successiva esamineremo come tutti questi ingressi e queste uscite sono collegati insieme.

Quando si crea il modello utilizzando project.create_model_obj, il parametro model_config rappresenta la configurazione del modello sintetico utilizzata per generare il modello. In questa pipeline, stiamo usando il nostro configurazione del modello predefinito, ma molti altri Opzioni di configurazione sono disponibili.

Dopo che il modello è stato configurato, chiamiamo model.submit_cloud(). Questo invierà il modello per l'addestramento e la generazione di record utilizzando Gretel Cloud. La chiamata a poll(model) bloccherà l'attività fino a quando il modello non avrà completato l'addestramento.

Ora che il modello è stato addestrato, utilizzeremo get_artifact_link per restituire un collegamento per scaricare le funzionalità sintetiche generate.



Anteprima dei dati dell'insieme sintetico di funzionalità.

 

Questo collegamento all'elemento verrà utilizzato come input per il passaggio finale upload_synthetic_features.

Carica caratteristiche sintetiche

 
 
Le caratteristiche originali sono state estratte ed è stata creata una versione sintetica. Ora è il momento di caricare le funzionalità sintetiche in modo che possano essere accessibili ai consumatori a valle. In questo esempio, utilizzeremo un bucket S3 come destinazione finale per il set di dati.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() with open(data_set, "rb") as synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.cs", )


Questo compito è piuttosto semplice. Il valore di input data_set contiene un collegamento HTTP firmato per scaricare il dataset sintetico dall'API di Gretel. L'attività leggerà quel file nell'operatore Airflow, quindi utilizzerà l'hook S3 già configurato per caricare il file della funzionalità sintetica in un bucket S3 in cui i consumatori oi modelli a valle possono accedervi.

Orchestrare la pipeline

 
 
Nelle ultime tre sezioni abbiamo esaminato tutto il codice necessario per estrarre, sintetizzare e caricare un set di dati. L'ultimo passaggio consiste nel collegare ciascuna di queste attività in un'unica pipeline del flusso d'aria.

Se ricorderete all'inizio di questo post, abbiamo brevemente accennato al concetto di DAG. Utilizzando l'API TaskFlow di Airflow possiamo comporre questi tre metodi Python in un DAG che definisce gli input, gli output e l'ordine in cui verrà eseguito ogni passaggio.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) Synthetic_data = generate_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Se segui il percorso di queste chiamate al metodo, alla fine otterrai un grafico che assomiglia alla nostra pipeline di funzionalità originale.



Conduttura sintetica Gretel su Airflow.

 

Se vuoi eseguire questa pipeline e vederla in azione, vai su che accompagna il repository Github. Lì troverai le istruzioni su come avviare un'istanza Airflow ed eseguire la pipeline end-to-end.

Avvolgere le cose

 
 
Se sei arrivato fino a questo punto, hai visto come Gretel può essere integrato in una pipeline di dati basata su Airflow. Combinando le API intuitive per gli sviluppatori di Gretel e il potente sistema di hook e operatori di Airflow, è facile creare pipeline ETL che rendono i dati più accessibili e più sicuri da usare.

Abbiamo anche parlato di un caso d'uso comune di ingegneria delle funzionalità in cui i dati sensibili potrebbero non essere facilmente accessibili. Generando una versione sintetica del set di dati, riduciamo il rischio di esporre dati sensibili, ma manteniamo comunque l'utilità del set di dati, rendendolo rapidamente disponibile a chi ne ha bisogno.

Pensando alla pipeline delle funzionalità in termini più astratti, ora abbiamo un modello che può essere riproposto per qualsiasi numero di nuove query SQL. Distribuendo una nuova versione della pipeline e sostituendo la query SQL iniziale, possiamo affrontare qualsiasi query potenzialmente sensibile con un set di dati sintetico che preserva la privacy del cliente. L'unica riga di codice che deve essere modificata è il percorso del file sql. Nessuna complessa ingegneria dei dati richiesta.

Grazie per aver letto

 
 
Inviaci una mail a ciao@gretel.ai o vieni con noi in Slack se avete domande o commenti. Ci piacerebbe sapere come stai utilizzando Airflow e come possiamo integrarci al meglio con le tue pipeline di dati esistenti.

 
Bio: Ha disegnato Newberry è un ingegnere informatico presso Gretel.ai.

Originale. Ripubblicato con il permesso.

Correlato:

Fonte: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Timestamp:

Di più da KDnuggets