Készítsen szintetikus adatfolyamot a Gretel és az Apache Airflow segítségével

Forrás csomópont: 1068200

Készítsen szintetikus adatfolyamot a Gretel és az Apache Airflow segítségével

Ebben a blogbejegyzésben egy ETL-folyamatot építünk, amely szintetikus adatokat generál PostgreSQL adatbázisból a Gretel Synthetic Data API-k és az Apache Airflow segítségével.


By Drew Newberry, szoftvermérnök a Gretel.ai-nál

Készítsen szintetikus adatfolyamot a Gretel és az Apache Airflow segítségével

Hey folks, my name is Drew, and I’m a software engineer here at Gretel. I’ve recently been thinking about patterns for integrating Gretel APIs into existing tools so that it’s easy to build data pipelines where security and customer privacy are first-class features, not just an afterthought or box to check.

One data engineering tool that is popular amongst Gretel engineers and customers is Apache Airflow. It also happens to work great with Gretel. In this blog post, we’ll show you how to build a synthetic data pipeline using Airflow, Gretel and PostgreSQL. Let’s jump in!

Mi az a légáramlás

 
 
Légáram egy munkafolyamat-automatizálási eszköz, amelyet gyakran használnak adatfolyamok felépítésére. Lehetővé teszi az adatmérnökök vagy adattudósok számára, hogy programozottan meghatározzák és telepítsék ezeket a folyamatokat Python és más ismert konstrukciók segítségével. Az Airflow lényege a DAG vagy irányított aciklikus gráf koncepciója. Az Airflow DAG modellt és API-készletet biztosít a folyamatösszetevők, azok függőségei és végrehajtási sorrendjének meghatározásához.

Előfordulhat, hogy Airflow csővezetékek replikálják az adatokat egy termékadatbázisból egy adattárházba. Más folyamatok olyan lekérdezéseket hajthatnak végre, amelyek normalizált adatokat egyesítenek egyetlen adathalmazba, amely alkalmas elemzésre vagy modellezésre. Egy másik folyamat is közzétehet egy napi jelentést, amely összesíti a legfontosabb üzleti mutatókat. Az ilyen használati esetek közös témája: az adatok rendszerek közötti mozgásának koordinálása. Itt ragyog az Airflow.

Az Airflow és annak gazdag ökoszisztémája kihasználása integrációk, az adatmérnökök és tudósok tetszőleges számú, egymástól eltérő eszközt vagy szolgáltatást egyetlen, könnyen karbantartható és működtethető, egységes csővezetékbe szervezhetnek. Ezen integrációs képességek megértésével most arról kezdünk beszélni, hogy a Gretel hogyan integrálható egy Airflow-folyamatba a gyakori adatkezelési munkafolyamatok javítása érdekében.

Hogy illeszkedik Gretel?

 
 
A Gretelnél az a küldetésünk, hogy egyszerűbbé és biztonságosabbá tegyük az adatokkal való munkát. Ha az ügyfelekkel beszélgetünk, az egyik fájdalmas pont, amiről gyakran hallunk, az, hogy mennyi időre és erőfeszítésre van szükség ahhoz, hogy az adattudósok hozzáférjenek az érzékeny adatokhoz. Használata Gretel szintetikus, csökkenthetjük az érzékeny adatokkal való munkavégzés kockázatát, ha az adatkészletről szintetikus másolatot készítünk. A Gretel és az Airflow integrálásával önkiszolgáló folyamatok hozhatók létre, amelyek megkönnyítik az adattudósok számára, hogy gyorsan hozzáférjenek a szükséges adatokhoz anélkül, hogy minden új adatkéréshez adatmérnökre lenne szükségük.

E képességek bemutatásához ETL-folyamatot építünk, amely kivonja a felhasználói tevékenység jellemzőit egy adatbázisból, előállítja az adatkészlet szintetikus verzióját, és elmenti az adatkészletet az S3-ba. Az S3-ba mentett szintetikus adatkészlettel az adatkutatók felhasználhatják a későbbi modellezéshez vagy elemzéshez anélkül, hogy veszélyeztetnék az ügyfelek magánéletét.

A dolgok elindításához először nézzük meg a csővezetéket madártávlatból. Ezen a diagramon minden csomópont egy folyamatlépést, vagy levegőáramlási kifejezéssel „feladatot” jelent.



Példa Gretel szintetikus csővezetékre az Airflow-n.

 

A folyamatot 3 szakaszra bonthatjuk, hasonlóan ahhoz, amit egy ETL-folyamatban találhat:

  • kivonat – The extract_features task will query a database, and transform the data into a set of features that can be used by data scientists for building models.
  • Synthesize – generate_synthetic_features will take the extracted features as input, train a synthetic model, and then generate a synthetic set of features using Gretel APIs and cloud services.
  • Terhelés – upload_synthetic_features saves the synthetic set of features to S3 where it can be ingested into any downstream model or analysis.

A következő néhány szakaszban mind a három lépést részletesebben megvizsgáljuk. Ha követni szeretné az egyes kódmintákat, lépjen a következőre: gretelai/gretel-airflow-pipelines és töltse le az ebben a blogbejegyzésben használt összes kódot. A repo utasításokat is tartalmaz az Airflow-példány elindításához és a folyamat végétől a végéig történő futtatásához.

Ezenkívül hasznos lehet az Airflow csővezeték teljes megtekintése, mielőtt az egyes komponenseket boncolgatnánk, dags/airbnb_user_bookings.py. A következő szakaszokban található kódrészletek a kapcsolódó felhasználói foglalási folyamatból származnak.

Kivonat funkciók

 
 
Az első feladat, az extract_features a nyers adatok kinyeréséért a forrásadatbázisból, és azok szolgáltatáskészletté alakításáért felelős. Ez gyakori jellemző tervezés probléma, amelyet bármelyik gépi tanulási vagy elemzési folyamatban találhat.

Példafolyamunkban létrehozunk egy PostgreSQL adatbázist, és betöltjük a foglalási adatokkal egy Airbnb Kaggle verseny.

Ez az adatkészlet két táblázatot tartalmaz, a felhasználókat és a munkameneteket. A Munkamenetek egy idegen kulcs hivatkozást tartalmaznak, a user_id. Ezzel a kapcsolattal létrehozunk egy olyan funkciókészletet, amely különféle, felhasználónként összesített foglalási mutatókat tartalmaz. A következő ábra a szolgáltatások felépítéséhez használt SQL-lekérdezést mutatja be.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_cent, count(DISTINCT action_type) AS number_of_unique_actions, round(átl. másodperc_eltelt)) AS min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id
)
SELECT u.id AS user_id, u.gender, u.age, u.language, u.signup_method, u.date_account_created, s.number_of_actions_taen, s.number_of_unique_actions, s.avg_session_time_s.sminsssess_s. sion_time_seconds
FROM session_features_by_user s LEFT JOIN felhasználók u ON u.id = s.user_id
LIMIT 5000


Az SQL-lekérdezést ezután a rendszer az Airflow-folyamatból hajtja végre, és egy közbenső S3-helyre írja a következő feladatdefinícióval.

@feladat()
def kivonat r+", suffix=".csv") mint tmp_csv: postgres.copy_expert( f"másolás ({sql_query}) az stdoutba csv fejléccel", tmp_csv.name ) s3.load_file( fájlnév=tmp_csv.name, key=key, ) visszatérési billentyűt


A feladat bemenete (sql_file) határozza meg, hogy milyen lekérdezést kell futtatni az adatbázisban. Ezt a lekérdezést a rendszer beolvassa a feladatba, majd végrehajtja az adatbázisban. A lekérdezés eredményei ezután az S3-ba íródnak, és a távoli fájlkulcs visszakerül a feladat kimeneteként.

Az alábbi képernyőképen a felülről származó kinyerési lekérdezés mintaeredményei láthatók. A következő részben leírjuk, hogyan hozható létre ennek az adatkészletnek a szintetikus változata.



Lekérdezés eredményének előnézete.

Funkciók szintetizálása Gretel API-k segítségével

 
 
Az egyes szolgáltatások szintetikus verziójának létrehozásához először egy szintetikus modellt kell betanítanunk, majd futtatnunk kell a modellt szintetikus rekordok generálásához. A Gretel Python SDK-kkal rendelkezik, amelyek megkönnyítik az Airflow feladatokba való integrálását.

A Python Client SDK-k mellett létrehoztunk egy Gretel Airflow Hook amely a Gretel API-kapcsolatokat és titkokat kezeli. A Gretel Airflow Connection beállítása után a Gretel API-hoz való csatlakozás olyan egyszerű, mint

from hooks.gretel import GretelHook gretel = GretelHook()
projekt = gretel.get_project()


Az Airflow-kapcsolatok konfigurálásával kapcsolatos további információkért tekintse meg Github-tárházunkat README.

A fenti példában szereplő projektváltozó fő belépési pontként használható a Gretel API-t használó szintetikus modellek betanításához és futtatásához. További részletekért tekintse meg nálunk Python API dokumentumok.

Visszautalva a foglalási folyamatra, most áttekintjük a generate_synthetic_features feladatot. Ez a lépés felelős a szintetikus modell betanításáért az előző feladatban kivont jellemzők segítségével.

@feladat()
def gener_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model() return model.get_artifact_link("data_preview")


A metódus aláírását tekintve látni fogja, hogy a data_source útvonalon halad. Ez az érték az előző lépésben kivont S3 jellemzőkre mutat. Egy későbbi részben bemutatjuk, hogy ezek a bemenetek és kimenetek hogyan vannak összekapcsolva.

Amikor a modellt a project.create_model_obj használatával hozza létre, a model_config paraméter a modell létrehozásához használt szintetikus modellkonfigurációt jelöli. Ebben a folyamatban a sajátunkat használjuk alapértelmezett modell konfig, hanem sok más konfigurációs lehetőségek rendelkezésre állnak.

A modell konfigurálása után meghívjuk a model.submit_cloud() függvényt. Ezzel elküldi a modellt képzésre és rekordgenerálásra a Gretel Cloud segítségével. A szavazás(modell) meghívása letiltja a feladatot, amíg a modell be nem fejezi a képzést.

Most, hogy a modell betanításra került, a get_artifact_link segítségével visszaadjuk a generált szintetikus funkciók letöltéséhez szükséges linket.



A szintetikus jellemzőkészlet adat előnézete.

 

Ez a mellékterméklink az utolsó upload_synthetic_features lépés bemeneteként lesz használva.

Szintetikus funkciók betöltése

 
 
Az eredeti jellemzők ki lettek bontva, és létrejött a szintetikus változat. Most itt az ideje feltölteni a szintetikus funkciókat, hogy a későbbi fogyasztók is hozzáférhessenek. Ebben a példában egy S3 tárolót fogunk használni az adatkészlet végső célhelyeként.

@feladat()
def upload_synthetic_features(data_set: str): context = get_current_context() with open(data_set, "rb") as synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_v",synthetic)cs.


Ez a feladat meglehetősen egyszerű. A data_set bemeneti érték egy aláírt HTTP-hivatkozást tartalmaz a szintetikus adatkészlet letöltéséhez a Gretel API-ból. A feladat beolvassa a fájlt az Airflow workerbe, majd a már konfigurált S3 hook segítségével feltölti a szintetikus jellemzőfájlt egy S3 tárolóba, ahol a későbbi fogyasztók vagy modellek hozzáférhetnek.

A Pipeline hangszerelése

 
 
Az utolsó három részben végigjártuk az összes kódot, amely egy adatkészlet kinyeréséhez, szintetizálásához és betöltéséhez szükséges. Az utolsó lépés ezeknek a feladatoknak a összekapcsolása egyetlen Airflow csővezetékbe.

Ha visszaemlékezik a bejegyzés elejére, röviden megemlítettük a DAG fogalmát. Az Airflow TaskFlow API használatával ezt a három Python metódust összeállíthatjuk egy DAG-ba, amely meghatározza a bemeneteket, kimeneteket és az egyes lépések végrehajtásának sorrendjét.

feature_path = kivonat_szolgáltatások("/opt/airflow/dags/sql/session_rollups__by_user.sql")
synthetic_data = gener_synthetic_features(feature_path)
upload_synthetic_features(synthetic_data)


Ha követi ezen metódushívások útvonalát, akkor végül olyan grafikont kap, amely úgy néz ki, mint az eredeti szolgáltatásfolyamat.



Gretel szintetikus csővezeték az Airflow-n.

 

Ha szeretné futtatni ezt a folyamatot, és látni szeretné működés közben, lépjen a következő oldalra: kísérő Github adattárat. Itt talál utasításokat egy Airflow példány indításához és a folyamat végétől a végéig történő futtatásához.

A dolgok csomagolása

 
 
Ha eljutott idáig, akkor láthatta, hogy a Gretel hogyan integrálható az Airflow-ra épített adatfolyamba. A Gretel fejlesztőbarát API-jainak, valamint az Airflow nagy teljesítményű akasztórendszerének és kezelőinek kombinálásával könnyű olyan ETL-folyamatokat építeni, amelyek könnyebben hozzáférhetővé és biztonságosabbá teszik az adatokat.

Beszéltünk egy általános jellemző mérnöki felhasználási esetről is, amikor az érzékeny adatok nem biztos, hogy könnyen hozzáférhetők. Az adatkészlet szintetikus verziójának létrehozásával csökkentjük az érzékeny adatok felfedésének kockázatát, de továbbra is megőrizzük az adatkészlet hasznosságát, miközben gyorsan elérhetővé tesszük azok számára, akiknek szükségük van rá.

Ha a szolgáltatásfolyamatról absztraktabb kifejezésekkel gondolunk, most van egy mintánk, amely tetszőleges számú új SQL-lekérdezéshez felhasználható. A folyamat új verziójának üzembe helyezésével és a kezdeti SQL-lekérdezés kicserélésével minden potenciálisan érzékeny lekérdezést szintetikus adatkészlettel kezelhetünk, amely megőrzi az ügyfelek adatait. Az egyetlen kódsor, amelyet módosítani kell, az az sql fájl elérési útja. Nincs szükség bonyolult adatkezelésre.

Köszönöm, hogy elolvasta

 
 
Küldjön egy e-mailt a hi@gretel.ai vagy csatlakozz hozzánk Laza ha bármilyen kérdése vagy észrevétele van. Szeretnénk hallani arról, hogyan használja az Airflow-t, és hogyan tudjuk a legjobban integrálni meglévő adatfolyamaival.

 
Bio: Drew Newberry a Gretel.ai szoftvermérnök.

eredeti. Engedéllyel újra közzétéve.

Kapcsolódó:

Forrás: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Időbélyeg:

Még több KDnuggets