Construiți o conductă de date sintetice folosind Gretel și Apache Airflow

Nodul sursă: 1068200

Construiți o conductă de date sintetice folosind Gretel și Apache Airflow

În această postare pe blog, construim o conductă ETL care generează date sintetice dintr-o bază de date PostgreSQL folosind API-urile Gretel pentru date sintetice și Apache Airflow.


By Drew Newberry, Inginer software la Gretel.ai

Construiți o conductă de date sintetice folosind Gretel și Apache Airflow

Bună, oameni buni, numele meu este Drew și sunt inginer de software aici, la Gretel. Recent, m-am gândit la modele pentru integrarea API-urilor Gretel în instrumentele existente, astfel încât să fie ușor să construiți conducte de date în care securitatea și confidențialitatea clienților sunt caracteristici de primă clasă, nu doar un gând ulterior sau o casetă de verificat.

Un instrument de inginerie a datelor care este popular printre inginerii și clienții Gretel este Apache Airflow. De asemenea, se întâmplă să funcționeze grozav cu Gretel. În această postare pe blog, vă vom arăta cum să construiți o conductă de date sintetice folosind Airflow, Gretel și PostgreSQL. Să sărim înăuntru!

Ce este Airflow

 
 
Debit de aer este un instrument de automatizare a fluxului de lucru folosit în mod obișnuit pentru a construi conducte de date. Le permite inginerilor de date sau oamenilor de știință de date să definească și să implementeze în mod programatic aceste conducte folosind Python și alte constructe familiare. La baza fluxului de aer se află conceptul de DAG sau grafic aciclic direcționat. Un DAG Airflow oferă un model și un set de API-uri pentru definirea componentelor conductei, dependențele acestora și ordinea de execuție.

Este posibil să găsiți conducte Airflow care replică datele dintr-o bază de date de produse într-un depozit de date. Alte conducte pot executa interogări care unesc date normalizate într-un singur set de date potrivit pentru analiză sau modelare. Totuși, un alt pipeline ar putea publica un raport zilnic care cumulează valorile cheie ale afacerii. O temă comună împărtășită între aceste cazuri de utilizare: coordonarea mișcării datelor între sisteme. Aici strălucește Airflow.

Folosind fluxul de aer și ecosistemul său bogat de integrările, inginerii de date și oamenii de știință pot orchestra orice număr de instrumente sau servicii disparate într-o singură conductă unificată care este ușor de întreținut și operat. Cu o înțelegere a acestor capacități de integrare, vom începe acum să vorbim despre modul în care Gretel ar putea fi integrată într-o conductă Airflow pentru a îmbunătăți fluxurile de lucru comune ale operațiunilor de date.

Cum se potrivește Gretel?

 
 
La Gretel, misiunea noastră este să facem lucrul cu datele mai ușor și mai sigur. Vorbind cu clienții, un punct dureros despre care auzim adesea este timpul și efortul necesar pentru a obține accesul oamenilor de știință la date sensibile. Folosind Gretel sintetice, putem reduce riscul de a lucra cu date sensibile prin generarea unei copii sintetice a setului de date. Prin integrarea Gretel cu Airflow, este posibil să se creeze conducte de autoservire care facilitează ca oamenii de știință să obțină rapid datele de care au nevoie, fără a necesita un inginer de date pentru fiecare cerere nouă de date.

Pentru a demonstra aceste capabilități, vom construi o conductă ETL care extrage caracteristicile activității utilizatorilor dintr-o bază de date, generează o versiune sintetică a setului de date și salvează setul de date în S3. Cu setul de date sintetice salvat în S3, acesta poate fi apoi utilizat de oamenii de știință de date pentru modelare sau analiză în aval, fără a compromite confidențialitatea clienților.

Pentru a începe lucrurile, să luăm mai întâi o vedere de pasăre a conductei. Fiecare nod din această diagramă reprezintă un pas de conductă, sau „sarcină” în termeni Airflow.



Exemplu de conducte sintetice Gretel pe Airflow.

 

Putem împărți conducta în 3 etape, similar cu ceea ce ați putea găsi într-o conductă ETL:

  • Extrage – Sarcina extract_features va interoga o bază de date și va transforma datele într-un set de caracteristici care pot fi folosite de oamenii de știință de date pentru a construi modele.
  • Sintetiza – generate_synthetic_features va prelua funcțiile extrase ca intrare, va antrena un model sintetic și apoi va genera un set sintetic de caracteristici folosind API-urile Gretel și serviciile cloud.
  • A incarca – upload_synthetic_features salvează setul sintetic de caracteristici pe S3, unde poate fi ingerat în orice model sau analiză din aval.

În următoarele câteva secțiuni vom aborda fiecare dintre acești trei pași mai detaliat. Dacă doriți să urmăriți fiecare eșantion de cod, vă puteți adresa gretelai/gretel-airflow-pipelines și descărcați tot codul folosit în această postare de blog. Repo conține, de asemenea, instrucțiuni pe care le puteți urma pentru a porni o instanță Airflow și pentru a rula conducta de la capăt la capăt.

În plus, poate fi util să vizualizați conducta Airflow în întregime, înainte de a diseca fiecare componentă, dags/airbnb_user_bookings.py. Fragmentele de cod din secțiunile următoare sunt extrase din canalul de rezervări de utilizator conectat.

Extrage caracteristici

 
 
Prima sarcină, extract_features este responsabilă pentru extragerea datelor brute din baza de date sursă și transformarea acestora într-un set de caracteristici. Acesta este un lucru comun inginerie de caracteristici problemă pe care o puteți găsi în orice conductă de învățare automată sau de analiză.

În exemplul nostru, vom furniza o bază de date PostgreSQL și o vom încărca cu date de rezervare de la un Concursul Airbnb Kaggle.

Acest set de date conține două tabele, Utilizatori și Sesiuni. Sesiunile conține o referință la cheie străină, user_id. Folosind această relație, vom crea un set de caracteristici care conțin diferite valori de rezervare agregate în funcție de utilizator. Următoarea figură reprezintă interogarea SQL utilizată pentru a construi caracteristicile.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_lapses) AS_time_lapses_secs) min_session_time_seconds, ( SELECT count(*) FROM sessions WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_time_seconds, s.max_session_time_time_seconds .5000.


Interogarea SQL este apoi executată din conducta noastră Airflow și scrisă într-o locație intermediară S3 folosind următoarea definiție a sarcinii.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" cu NamedTemporary (mode="r+", suffix=".csv") ca tmp_csv: postgres.copy_expert( f"copy ({sql_query}) to stdout with csv header", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, cheie=cheie, ) cheie return


Intrarea în sarcină, sql_file, determină ce interogare să ruleze în baza de date. Această interogare va fi citită în sarcină și apoi executată în baza de date. Rezultatele interogării vor fi apoi scrise în S3 și cheia fișierului de la distanță va fi returnată ca rezultat al sarcinii.

Captura de ecran de mai jos arată un exemplu de set de rezultate ale interogării de extragere de mai sus. Vom descrie cum să creați o versiune sintetică a acestui set de date în secțiunea următoare.



Previzualizarea rezultatelor interogării.

Sintetizați funcțiile folosind API-urile Gretel

 
 
Pentru a genera o versiune sintetică a fiecărei caracteristici, trebuie mai întâi să antrenăm un model sintetic și apoi să rulăm modelul pentru a genera înregistrări sintetice. Gretel are un set de SDK-uri Python care facilitează integrarea în sarcinile Airflow.

Pe lângă SDK-urile Python Client, am creat un Cârlig pentru fluxul de aer Gretel care gestionează conexiunile și secretele Gretel API. După configurarea unei conexiuni Gretel Airflow, conectarea la API-ul Gretel este la fel de ușoară

din hooks.gretel import GretelHook gretel = GretelHook() proiect = gretel.get_project()


Pentru mai multe informații despre cum să configurați conexiunile Airflow, vă rugăm să consultați depozitul nostru Github README.

Variabila de proiect din exemplul de mai sus poate fi folosită ca punct de intrare principal pentru antrenarea și rularea modelelor sintetice folosind API-ul Gretel. Pentru mai multe detalii, ne puteți consulta Documente Python API.

Referindu-ne la canalul de rezervări, vom examina acum sarcina generate_synthetic_features. Acest pas este responsabil pentru antrenarea modelului sintetic folosind caracteristicile extrase în sarcina anterioară.

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() sondaj(model) return model.get_artifact_link("data_preview")


Privind semnătura metodei, veți vedea că are o cale, data_source. Această valoare indică caracteristicile S3 extrase în pasul anterior. Într-o secțiune ulterioară vom analiza modul în care toate aceste intrări și ieșiri sunt conectate împreună.

Când se creează modelul folosind project.create_model_obj, parametrul model_config reprezintă configurația sintetică a modelului utilizată pentru a genera modelul. În această conductă, folosim sistemul nostru config model implicit, dar multe altele Opțiunile de configurare sunt disponibile.

După ce modelul a fost configurat, apelăm model.submit_cloud(). Acesta va trimite modelul pentru antrenament și generarea de înregistrări folosind Gretel Cloud. Apelarea sondajului (modelului) va bloca sarcina până când modelul va finaliza antrenamentul.

Acum că modelul a fost antrenat, vom folosi get_artifact_link pentru a returna un link pentru a descărca funcțiile sintetice generate.



Previzualizarea datelor setului sintetic de caracteristici.

 

Acest link artefact va fi folosit ca intrare la pasul final upload_synthetic_features.

Încărcați caracteristici sintetice

 
 
Caracteristicile originale au fost extrase și a fost creată o versiune sintetică. Acum este timpul să încărcați caracteristicile sintetice, astfel încât să poată fi accesate de consumatorii din aval. În acest exemplu, vom folosi o găleată S3 ca destinație finală pentru setul de date.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() cu open(data_set, "rb") ca synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features._synthetic", )


Această sarcină este destul de simplă. Valoarea de intrare data_set conține un link HTTP semnat pentru a descărca setul de date sintetice din API-ul Gretel. Sarcina va citi acel fișier în Airflow worker și apoi va folosi cârligul S3 deja configurat pentru a încărca fișierul de caracteristici sintetice într-o găleată S3 unde consumatorii sau modelele din aval îl pot accesa.

Orchestrarea conductei

 
 
În ultimele trei secțiuni, am parcurs tot codul necesar pentru a extrage, sintetiza și încărca un set de date. Ultimul pas este să legați fiecare dintre aceste sarcini împreună într-o singură conductă Airflow.

Dacă vă veți aminti de începutul acestei postări, am menționat pe scurt conceptul de DAG. Folosind API-ul TaskFlow de la Airflow, putem compune aceste trei metode Python într-un DAG care definește intrările, ieșirile și ordinea în care fiecare pas va fi rulat.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = generate_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Dacă urmați calea acestor apeluri de metodă, veți obține în cele din urmă un grafic care arată ca pipeline-ul nostru de caracteristici inițiale.



Conducta de sintetice Gretel pe Airflow.

 

Dacă doriți să rulați această conductă și să o vedeți în acțiune, mergeți la depozitul Github care însoțește. Acolo veți găsi instrucțiuni despre cum să porniți o instanță Airflow și să rulați conducta de la capăt la capăt.

Înfășurați lucrurile

 
 
Dacă ați ajuns până aici, ați văzut cum Gretel poate fi integrat într-o conductă de date construită pe Airflow. Combinând API-urile Gretel prietenoase cu dezvoltatorii și sistemul puternic de cârlige și operatori Airflow, este ușor să construiți conducte ETL care fac datele mai accesibile și mai sigur de utilizat.

Am vorbit, de asemenea, despre un caz de utilizare comun al ingineriei caracteristicilor în care este posibil ca datele sensibile să nu fie ușor accesibile. Prin generarea unei versiuni sintetice a setului de date, reducem riscul expunerii oricăror date sensibile, dar păstrăm totuși utilitatea setului de date, făcându-l rapid disponibil celor care au nevoie de el.

Gândindu-ne la pipeline de caracteristici în termeni mai abstracti, acum avem un model care poate fi reutilizat pentru orice număr de interogări SQL noi. Prin implementarea unei noi versiuni a conductei și schimbând interogarea inițială SQL, putem face față oricărei interogări potențial sensibile cu un set de date sintetice care păstrează confidențialitatea clienților. Singura linie de cod care trebuie schimbată este calea către fișierul sql. Nu este necesară inginerie complexă a datelor.

Vă mulțumim pentru lectură

 
 
Trimite-ne un e-mail la salut@gretel.ai sau vino alături de noi Moale dacă aveți întrebări sau comentarii. Ne-ar plăcea să aflăm cum utilizați Airflow și cum ne putem integra cel mai bine cu conductele dvs. de date existente.

 
Bio: Drew Newberry este inginer software la Gretel.ai.

Original. Repostat cu permisiunea.

Related:

Sursa: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Timestamp-ul:

Mai mult de la KDnuggets