Cree una canalización de datos sintéticos con Gretel y Apache Airflow

Nodo de origen: 1068200

Cree una canalización de datos sintéticos con Gretel y Apache Airflow

En esta publicación de blog, creamos una canalización ETL que genera datos sintéticos a partir de una base de datos PostgreSQL utilizando las API de datos sintéticos de Gretel y Apache Airflow.


By Dibujó Newberry, Ingeniero de software en Gretel.ai

Cree una canalización de datos sintéticos con Gretel y Apache Airflow

Hola amigos, mi nombre es Drew y soy ingeniero de software aquí en Gretel. Recientemente, he estado pensando en patrones para integrar las API de Gretel en herramientas existentes para que sea fácil construir canales de datos donde la seguridad y la privacidad del cliente son características de primera clase, no solo una ocurrencia tardía o una casilla para marcar.

Una herramienta de ingeniería de datos que es popular entre los ingenieros y clientes de Gretel es Apache Airflow. También funciona muy bien con Gretel. En esta publicación de blog, le mostraremos cómo crear una canalización de datos sintéticos utilizando Airflow, Gretel y PostgreSQL. ¡Saltemos!

¿Qué es el flujo de aire?

 
 
Flujo de aire es una herramienta de automatización del flujo de trabajo que se utiliza habitualmente para crear canalizaciones de datos. Permite a los ingenieros de datos o científicos de datos definir e implementar de manera programática estas canalizaciones utilizando Python y otras construcciones familiares. En el núcleo de Airflow se encuentra el concepto de un DAG o gráfico acíclico dirigido. Un DAG de flujo de aire proporciona un modelo y un conjunto de API para definir los componentes de la tubería, sus dependencias y el orden de ejecución.

Puede encontrar canalizaciones de Airflow que replican datos de una base de datos de productos en un almacén de datos. Otras canalizaciones pueden ejecutar consultas que unen datos normalizados en un único conjunto de datos adecuado para análisis o modelado. Sin embargo, otra canalización podría publicar un informe diario que agregue métricas comerciales clave. Un tema común compartido entre estos casos de uso: coordinar el movimiento de datos entre sistemas. Aquí es donde brilla Airflow.

Aprovechando Airflow y su rico ecosistema de integraciones, los ingenieros de datos y los científicos pueden orquestar cualquier número de herramientas o servicios dispares en una única tubería unificada que sea fácil de mantener y operar. Con una comprensión de estas capacidades de integración, ahora comenzaremos a hablar sobre cómo Gretel podría integrarse en una tubería de Airflow para mejorar los flujos de trabajo de operaciones de datos comunes.

¿Cómo encaja Gretel?

 
 
En Gretel, nuestra misión es hacer que trabajar con los datos sea más fácil y seguro. Al hablar con los clientes, uno de los puntos débiles que a menudo escuchamos es el tiempo y el esfuerzo necesarios para que los científicos de datos accedan a datos confidenciales. Utilizando Sintéticos Gretel, podemos reducir el riesgo de trabajar con datos confidenciales generando una copia sintética del conjunto de datos. Al integrar Gretel con Airflow, es posible crear canalizaciones de autoservicio que facilitan a los científicos de datos obtener rápidamente los datos que necesitan sin requerir un ingeniero de datos para cada nueva solicitud de datos.

Para demostrar estas capacidades, crearemos una canalización ETL que extrae las características de la actividad del usuario de una base de datos, genera una versión sintética del conjunto de datos y guarda el conjunto de datos en S3. Con el conjunto de datos sintéticos guardado en S3, los científicos de datos pueden utilizarlo para el modelado o análisis posterior sin comprometer la privacidad del cliente.

Para comenzar, primero echemos una vista de pájaro del oleoducto. Cada nodo de este diagrama representa un paso de canalización o "tarea" en términos de Airflow.



Ejemplo de canalización de sintéticos de Gretel en Airflow.

 

Podemos dividir la canalización en 3 etapas, similar a lo que podría encontrar en una canalización ETL:

  • Extraer - La tarea extract_features consultará una base de datos y transformará los datos en un conjunto de características que los científicos de datos pueden usar para construir modelos.
  • Sintetizar - generate_synthetic_features tomará las características extraídas como entrada, entrenará un modelo sintético y luego generará un conjunto sintético de características utilizando las API de Gretel y los servicios en la nube.
  • Carga - upload_synthetic_features guarda el conjunto sintético de características en S3, donde se puede incorporar a cualquier modelo o análisis posterior.

En las siguientes secciones, profundizaremos en cada uno de estos tres pasos con mayor detalle. Si desea seguir cada muestra de código, puede dirigirse a gretelai / gretel-airflow-pipelines y descargue todo el código utilizado en esta publicación de blog. El repositorio también contiene instrucciones que puede seguir para iniciar una instancia de Airflow y ejecutar la canalización de un extremo a otro.

Además, puede ser útil ver la tubería de Airflow en su totalidad, antes de analizar cada componente, dags / airbnb_user_bookings.py. Los fragmentos de código de las siguientes secciones se extraen de la canalización de reservas de usuarios vinculados.

Extraer características

 
 
La primera tarea, extract_features es responsable de extraer datos sin procesar de la base de datos de origen y transformarlos en un conjunto de características. Esta es una ingeniería de características problema que puede encontrar en cualquier proceso de análisis o aprendizaje automático.

En nuestra canalización de ejemplo, aprovisionaremos una base de datos PostgreSQL y la cargaremos con datos de reserva de un Concurso Airbnb Kaggle.

Este conjunto de datos contiene dos tablas, Usuarios y Sesiones. Las sesiones contienen una referencia de clave externa, user_id. Usando esta relación, crearemos un conjunto de características que contienen varias métricas de reserva agregadas por usuario. La siguiente figura representa la consulta SQL utilizada para construir las características.

CON session_features_by_user AS (SELECT user_id, count (*) AS number_of_actions_taken, count (DISTINCT action_type) AS number_of_unique_actions, round (avg (secs_elapsed)) AS avg_session_time_seconds, round (max (sectimes_elapsed_elapses) AS min_session_time_seconds, (SELECCIONAR recuento (*) FROM sesiones s DONDE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sesiones GROUP BY user_id) SELECT u.id AS user_id, u.gender, u.age, u .idioma, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds ONEF_user_user s.features


Luego, la consulta SQL se ejecuta desde nuestra tubería de Airflow y se escribe en una ubicación intermedia de S3 utilizando la siguiente definición de tarea.

@task () def extract_features (sql_file: str) -> str: context = get_current_context () sql_query = Path (sql_file) .read_text () key = f "{context ['dag_run']. run_id} _booking_features.csv" con NamedTemporaryFile (mode = "r +", sufijo = ". csv") como tmp_csv: postgres.copy_expert (f "copiar ({sql_query}) a stdout con encabezado csv", tmp_csv.name) s3.load_file (filename = tmp_csv.name, clave = clave,) tecla de retorno


La entrada a la tarea, sql_file, determina qué consulta ejecutar en la base de datos. Esta consulta se leerá en la tarea y luego se ejecutará en la base de datos. Los resultados de la consulta se escribirán en S3 y la clave del archivo remoto se devolverá como resultado de la tarea.

La siguiente captura de pantalla muestra un conjunto de resultados de muestra de la consulta de extracción desde arriba. Describiremos cómo crear una versión sintética de este conjunto de datos en la siguiente sección.



Vista previa del resultado de la consulta.

Sintetizar funciones usando las API de Gretel

 
 
Para generar una versión sintética de cada característica, primero debemos entrenar un modelo sintético y luego ejecutar el modelo para generar registros sintéticos. Gretel tiene un conjunto de SDK de Python que facilitan la integración en las tareas de Airflow.

Además de los SDK de Python Client, hemos creado un Gancho de flujo de aire Gretel que gestiona las conexiones y los secretos de la API de Gretel. Después de configurar una conexión de flujo de aire Gretel, conectarse a la API de Gretel es tan fácil como

de hooks.gretel importar GretelHook gretel = GretelHook () proyecto = gretel.get_project ()


Para obtener más información sobre cómo configurar las conexiones de Airflow, consulte nuestro repositorio de Github README.

La variable del proyecto en el ejemplo anterior se puede usar como el punto de entrada principal para entrenar y ejecutar modelos sintéticos usando la API de Gretel. Para obtener más detalles, puede consultar nuestro Documentos de la API de Python.

Volviendo a la canalización de reservas, ahora revisaremos la tarea generate_synthetic_features. Este paso es responsable de entrenar el modelo sintético utilizando las características extraídas en la tarea anterior.

@task () def generate_synthetic_features (data_source: str) -> str: project = gretel.get_project () model = project.create_model_obj (model_config = "Synthetics / default", data_source = s3.download_file (data_source)) model.submit_cloud () encuesta (modelo) return model.get_artifact_link ("data_preview")


Al observar la firma del método, verá que toma una ruta, data_source. Este valor apunta a las características de S3 extraídas en el paso anterior. En una sección posterior, veremos cómo se conectan todas estas entradas y salidas.

Al crear el modelo usando project.create_model_obj, el parámetro model_config representa la configuración del modelo sintético que se usa para generar el modelo. En esta canalización, estamos usando nuestro configuración de modelo predeterminada, pero muchas otras opciones de configuración están disponibles.

Una vez configurado el modelo, lo llamamos model.submit_cloud (). Esto enviará el modelo para entrenamiento y generación de registros usando Gretel Cloud. Llamar a la encuesta (modelo) bloqueará la tarea hasta que el modelo haya completado el entrenamiento.

Ahora que se ha entrenado el modelo, usaremos get_artifact_link para devolver un enlace para descargar las características sintéticas generadas.



Vista previa de datos del conjunto sintético de características.

 

Este enlace de artefacto se utilizará como entrada para el paso final upload_synthetic_features.

Cargar características sintéticas

 
 
Se han extraído las características originales y se ha creado una versión sintética. Ahora es el momento de cargar las funciones sintéticas para que los consumidores intermedios puedan acceder a ellas. En este ejemplo, usaremos un depósito de S3 como destino final para el conjunto de datos.

@task () def upload_synthetic_features (data_set: str): context = get_current_context () con open (data_set, "rb") como synth_features: s3.load_file_obj (file_obj = synth_features, key = f "{..._ booking_features_synthetic.cs )


Esta tarea es bastante sencilla. El valor de entrada del conjunto de datos contiene un enlace HTTP firmado para descargar el conjunto de datos sintéticos de la API de Gretel. La tarea leerá ese archivo en el trabajador de Airflow y luego usará el enlace S3 ya configurado para cargar el archivo de características sintéticas en un depósito S3 donde los consumidores o modelos posteriores puedan acceder a él.

Orquestando el Pipeline

 
 
En las últimas tres secciones, analizamos todo el código necesario para extraer, sintetizar y cargar un conjunto de datos. El último paso es unir cada una de estas tareas en una única tubería de flujo de aire.

Si recuerda el comienzo de esta publicación, mencionamos brevemente el concepto de un DAG. Usando la API TaskFlow de Airflow, podemos componer estos tres métodos de Python en un DAG que define las entradas, salidas y el orden en que se ejecutará cada paso.

feature_path = extract_features ("/opt/airflow/dags/sql/session_rollups__by_user.sql") datos_sintéticos = generate_synthetic_features (feature_path) upload_synthetic_features (datos_sintéticos)


Si sigue la ruta de estas llamadas a métodos, eventualmente obtendrá un gráfico que se parece a nuestra canalización de características original.



Canalización de sintéticos de Gretel en Airflow.

 

Si desea ejecutar esta canalización y verla en acción, diríjase al que acompaña al repositorio de Github. Allí encontrará instrucciones sobre cómo iniciar una instancia de Airflow y ejecutar la canalización de un extremo a otro.

Envolviendo cosas

 
 
Si ha llegado hasta aquí, ha visto cómo se puede integrar Gretel en una canalización de datos basada en Airflow. Al combinar las API amigables para desarrolladores de Gretel y el poderoso sistema de ganchos y operadores de Airflow, es fácil construir tuberías ETL que hacen que los datos sean más accesibles y más seguros de usar.

También hablamos sobre un caso de uso común de ingeniería de funciones en el que es posible que no se pueda acceder fácilmente a datos confidenciales. Al generar una versión sintética del conjunto de datos, reducimos el riesgo de exponer cualquier dato sensible, pero aún conservamos la utilidad del conjunto de datos mientras lo ponemos rápidamente a disposición de quienes lo necesitan.

Pensando en la canalización de funciones en términos más abstractos, ahora tenemos un patrón que se puede reutilizar para cualquier número de nuevas consultas SQL. Al implementar una nueva versión de la canalización e intercambiar la consulta SQL inicial, podemos enfrentar cualquier consulta potencialmente sensible con un conjunto de datos sintéticos que preserva la privacidad del cliente. La única línea de código que debe cambiar es la ruta al archivo sql. No se requiere ingeniería de datos compleja.

Gracias por leer

 
 
Envíanos un correo electrónico a hola@gretel.ai o ven y únete a nosotros Flojo si tiene alguna pregunta o comentario. Nos encantaría saber cómo está utilizando Airflow y cómo podemos integrarnos mejor con sus canales de datos existentes.

 
Bio: Dibujó Newberry es ingeniero de software en Gretel.ai.

Original. Publicado de nuevo con permiso.

Relacionado:

Fuente: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Sello de tiempo:

Mas de nuggets