Construire un pipeline de données synthétiques à l'aide de Gretel et Apache Airflow

Nœud source: 1068200

Construire un pipeline de données synthétiques à l'aide de Gretel et Apache Airflow

Dans cet article de blog, nous construisons un pipeline ETL qui génère des données synthétiques à partir d'une base de données PostgreSQL à l'aide des API de données synthétiques de Gretel et d'Apache Airflow.


By Drew Newberry, Ingénieur logiciel chez Gretel.ai

Construire un pipeline de données synthétiques à l'aide de Gretel et Apache Airflow

Salut les amis, je m'appelle Drew et je suis ingénieur logiciel chez Gretel. J'ai récemment réfléchi à des modèles d'intégration des API Gretel dans les outils existants afin qu'il soit facile de créer des pipelines de données où la sécurité et la confidentialité des clients sont des fonctionnalités de première classe, et pas seulement une réflexion après coup ou une case à cocher.

Un outil d'ingénierie des données très apprécié des ingénieurs et des clients de Gretel est Apache Airflow. Cela fonctionne aussi très bien avec Gretel. Dans cet article de blog, nous allons vous montrer comment créer un pipeline de données synthétiques à l'aide d'Airflow, Gretel et PostgreSQL. Allons-y !

Qu'est-ce que le flux d'air

 
 
Débit d'air est un outil d'automatisation de workflow couramment utilisé pour créer des pipelines de données. Il permet aux ingénieurs de données ou aux scientifiques des données de définir et de déployer ces pipelines par programmation à l'aide de Python et d'autres constructions familières. Au cœur d'Airflow se trouve le concept de DAG, ou graphe acyclique dirigé. Un DAG Airflow fournit un modèle et un ensemble d'API pour définir les composants du pipeline, leurs dépendances et l'ordre d'exécution.

Vous pouvez trouver des pipelines Airflow répliquant les données d'une base de données de produits dans un entrepôt de données. D'autres pipelines peuvent exécuter des requêtes qui joignent des données normalisées dans un ensemble de données unique adapté à l'analyse ou à la modélisation. Un autre pipeline peut publier un rapport quotidien agrégeant des mesures commerciales clés. Un thème commun partagé entre ces cas d'utilisation : coordonner le mouvement des données entre les systèmes. C'est là qu'Airflow brille.

Tirer parti d'Airflow et de son riche écosystème de intégrations, les ingénieurs de données et les scientifiques peuvent orchestrer n'importe quel nombre d'outils ou de services disparates dans un seul pipeline unifié, facile à entretenir et à utiliser. Avec une compréhension de ces capacités d'intégration, nous allons maintenant commencer à parler de la façon dont Gretel pourrait être intégré dans un pipeline Airflow pour améliorer les flux de travail courants des opérations de données.

Comment Gretel s'intègre-t-elle ?

 
 
Chez Gretel, notre mission est de rendre le travail des données plus simple et plus sûr. En discutant avec les clients, l'un des problèmes dont nous entendons souvent parler est le temps et les efforts nécessaires pour que les scientifiques des données accèdent aux données sensibles. À l'aide de Gretel Synthétiques, nous pouvons réduire le risque de travailler avec des données sensibles en générant une copie synthétique de l'ensemble de données. En intégrant Gretel à Airflow, il est possible de créer des pipelines en libre-service qui permettent aux data scientists d'obtenir rapidement les données dont ils ont besoin sans avoir besoin d'un ingénieur de données pour chaque nouvelle demande de données.

Pour démontrer ces capacités, nous allons créer un pipeline ETL qui extrait les fonctionnalités d'activité des utilisateurs d'une base de données, génère une version synthétique de l'ensemble de données et enregistre l'ensemble de données sur S3. Avec l'ensemble de données synthétique enregistré dans S3, il peut ensuite être utilisé par les scientifiques des données pour la modélisation ou l'analyse en aval sans compromettre la confidentialité des clients.

Pour commencer, commençons par une vue d'ensemble du pipeline. Chaque nœud de ce diagramme représente une étape de pipeline, ou « tâche » en termes de flux d'air.



Exemple de pipeline de produits synthétiques Gretel sur Airflow.

 

Nous pouvons diviser le pipeline en 3 étapes, similaires à ce que vous pourriez trouver dans un pipeline ETL :

  • Extraction – La tâche extract_features interrogera une base de données et transformera les données en un ensemble de fonctionnalités pouvant être utilisées par les data scientists pour créer des modèles.
  • Synthétiser – generate_synthetic_features prendra les fonctionnalités extraites en entrée, formera un modèle synthétique, puis générera un ensemble synthétique de fonctionnalités à l'aide des API Gretel et des services cloud.
  • LesCharge – upload_synthetic_features enregistre l'ensemble synthétique de fonctionnalités sur S3 où il peut être ingéré dans n'importe quel modèle ou analyse en aval.

Dans les prochaines sections, nous approfondirons chacune de ces trois étapes. Si vous souhaitez suivre chaque exemple de code, vous pouvez vous rendre sur Gretelai/gretel-airflow-pipelines et téléchargez tout le code utilisé dans cet article de blog. Le référentiel contient également des instructions que vous pouvez suivre pour démarrer une instance Airflow et exécuter le pipeline de bout en bout.

De plus, il peut être utile de visualiser le pipeline Airflow dans son intégralité, avant de disséquer chaque composant, dags/airbnb_user_bookings.py. Les extraits de code dans les sections suivantes sont extraits du pipeline de réservation d'utilisateurs lié.

Extraire les fonctionnalités

 
 
La première tâche, extract_features, est chargée d'extraire les données brutes de la base de données source et de les transformer en un ensemble de fonctionnalités. C'est une commune ingénierie des fonctionnalités problème que vous pourriez rencontrer dans n'importe quel pipeline d'apprentissage automatique ou d'analyse.

Dans notre exemple de pipeline, nous allons provisionner une base de données PostgreSQL et la charger avec les données de réservation d'un Concours Airbnb Kaggle.

Cet ensemble de données contient deux tables, Users et Sessions. Sessions contient une référence de clé étrangère, user_id. En utilisant cette relation, nous allons créer un ensemble de fonctionnalités contenant diverses métriques de réservation agrégées par utilisateur. La figure suivante représente la requête SQL utilisée pour créer les fonctionnalités.

AVEC session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)) AS max_ AS_session_time(seconds), min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds FROM session_features_by_JOIN s LEFT utilisateurs


La requête SQL est ensuite exécutée à partir de notre pipeline Airflow et écrite dans un emplacement S3 intermédiaire à l'aide de la définition de tâche suivante.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" avec NamedTemporaryFile (mode="r+", suffix=".csv") comme tmp_csv: postgres.copy_expert( f"copy ({sql_query}) vers stdout avec en-tête csv", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, clé=clé, ) clé de retour


L'entrée de la tâche, sql_file, détermine la requête à exécuter sur la base de données. Cette requête sera lue dans la tâche puis exécutée sur la base de données. Les résultats de la requête seront ensuite écrits dans S3 et la clé de fichier distant sera renvoyée en tant que sortie de la tâche.

La capture d'écran ci-dessous montre un exemple d'ensemble de résultats de la requête d'extraction ci-dessus. Nous décrirons comment créer une version synthétique de cet ensemble de données dans la section suivante.



Aperçu des résultats de la requête.

Synthétiser des fonctionnalités à l'aide des API Gretel

 
 
Pour générer une version synthétique de chaque fonctionnalité, nous devons d'abord entraîner un modèle synthétique, puis exécuter le modèle pour générer des enregistrements synthétiques. Gretel dispose d'un ensemble de SDK Python qui facilitent l'intégration dans les tâches Airflow.

En plus des SDK client Python, nous avons créé un Crochet de flux d'air Gretel qui gère les connexions et les secrets de l'API Gretel. Après avoir configuré une connexion Gretel Airflow, la connexion à l'API Gretel est aussi simple que

de hooks.gretel import GretelHook gretel = GretelHook() project = gretel.get_project()


Pour plus d'informations sur la configuration des connexions Airflow, veuillez vous référer à notre référentiel Github README.

La variable de projet dans l'exemple ci-dessus peut être utilisée comme point d'entrée principal pour la formation et l'exécution de modèles synthétiques à l'aide de l'API de Gretel. Pour plus de détails, vous pouvez consulter notre Documentation de l'API Python.

En revenant au pipeline de réservation, nous allons maintenant passer en revue la tâche generate_synthetic_features. Cette étape est chargée d'entraîner le modèle synthétique à l'aide des caractéristiques extraites dans la tâche précédente.

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) return model.get_artifact_link("data_preview")


En regardant la signature de la méthode, vous verrez qu'elle prend un chemin, data_source. Cette valeur pointe vers les caractéristiques S3 extraites à l'étape précédente. Dans une section ultérieure, nous expliquerons comment toutes ces entrées et sorties sont câblées ensemble.

Lors de la création du modèle à l'aide de project.create_model_obj, le paramètre model_config représente la configuration du modèle synthétique utilisée pour générer le modèle. Dans ce pipeline, nous utilisons notre configuration du modèle par défaut, mais bien d'autres les options de configuration sont disponibles.

Une fois le modèle configuré, nous appelons model.submit_cloud(). Cela soumettra le modèle à la formation et à la génération d'enregistrements à l'aide de Gretel Cloud. L'appel de poll(model) bloquera la tâche jusqu'à ce que le modèle ait terminé l'entraînement.

Maintenant que le modèle a été entraîné, nous allons utiliser get_artifact_link pour renvoyer un lien permettant de télécharger les fonctionnalités synthétiques générées.



Aperçu des données de l'ensemble synthétique de fonctionnalités.

 

Ce lien d'artefact sera utilisé comme entrée pour l'étape finale upload_synthetic_features.

Charger des fonctionnalités synthétiques

 
 
Les caractéristiques originales ont été extraites et une version synthétique a été créée. Il est maintenant temps de télécharger les fonctionnalités synthétiques afin qu'elles soient accessibles aux consommateurs en aval. Dans cet exemple, nous allons utiliser un compartiment S3 comme destination finale pour l'ensemble de données.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() avec open(data_set, "rb") comme synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


Cette tâche est assez simple. La valeur d'entrée data_set contient un lien HTTP signé pour télécharger l'ensemble de données synthétique à partir de l'API de Gretel. La tâche lira ce fichier dans le nœud de calcul Airflow, puis utilisera le hook S3 déjà configuré pour télécharger le fichier de fonctionnalités synthétiques dans un compartiment S3 où les consommateurs ou les modèles en aval peuvent y accéder.

Orchestrer le pipeline

 
 
Au cours des trois dernières sections, nous avons parcouru tout le code requis pour extraire, synthétiser et charger un ensemble de données. La dernière étape consiste à lier chacune de ces tâches dans un seul pipeline Airflow.

Si vous vous souvenez du début de cet article, nous avons brièvement mentionné le concept de DAG. À l'aide de l'API TaskFlow d'Airflow, nous pouvons composer ces trois méthodes Python dans un DAG qui définit les entrées, les sorties et l'ordre d'exécution de chaque étape.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) Synthetic_data = generate_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Si vous suivez le chemin de ces appels de méthode, vous obtiendrez éventuellement un graphique qui ressemble à notre pipeline de fonctionnalités d'origine.



Pipeline de synthèse Gretel sur Airflow.

 

Si vous souhaitez exécuter ce pipeline et le voir en action, rendez-vous sur le référentiel Github d'accompagnement. Vous y trouverez des instructions sur la façon de démarrer une instance Airflow et d'exécuter le pipeline de bout en bout.

Envelopper les choses

 
 
Si vous êtes arrivé jusqu'ici, vous avez vu comment Gretel peut être intégré dans un pipeline de données construit sur Airflow. En combinant les API conviviales pour les développeurs de Gretel et le puissant système de crochets et d'opérateurs d'Airflow, il est facile de créer des pipelines ETL qui rendent les données plus accessibles et plus sûres à utiliser.

Nous avons également parlé d'un cas d'utilisation courant d'ingénierie de fonctionnalités où les données sensibles peuvent ne pas être facilement accessibles. En générant une version synthétique de l'ensemble de données, nous réduisons le risque d'exposer des données sensibles, tout en conservant l'utilité de l'ensemble de données tout en le rendant rapidement accessible à ceux qui en ont besoin.

En pensant au pipeline de fonctionnalités en termes plus abstraits, nous avons maintenant un modèle qui peut être réutilisé pour n'importe quel nombre de nouvelles requêtes SQL. En déployant une nouvelle version du pipeline et en remplaçant la requête SQL initiale, nous pouvons faire face à toute requête potentiellement sensible avec un ensemble de données synthétiques qui préserve la confidentialité des clients. La seule ligne de code qui doit changer est le chemin d'accès au fichier sql. Aucune ingénierie de données complexe n'est requise.

Merci d'avoir lu

 
 
Envoyez-nous un email à salut@gretel.ai ou venez nous rejoindre Slack si vous avez des questions ou des commentaires. Nous aimerions savoir comment vous utilisez Airflow et comment nous pouvons nous intégrer au mieux à vos pipelines de données existants.

 
Bio: Drew Newberry est ingénieur logiciel chez Gretel.ai.

ORIGINALE. Republié avec permission.

Connexe:

Source : https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Horodatage:

Plus de KDnuggetsGenericName