Crie um pipeline de dados sintéticos usando Gretel e Apache Airflow

Nó Fonte: 1068200

Crie um pipeline de dados sintéticos usando Gretel e Apache Airflow

Nesta postagem do blog, criamos um pipeline ETL que gera dados sintéticos de um banco de dados PostgreSQL usando as APIs de dados sintéticos da Gretel e o Apache Airflow.


By Drew Newberry, Engenheiro de Software na Gretel.ai

Crie um pipeline de dados sintéticos usando Gretel e Apache Airflow

Ei pessoal, meu nome é Drew e sou engenheiro de software aqui na Gretel. Recentemente, tenho pensado em padrões para integrar APIs Gretel em ferramentas existentes para que seja fácil construir pipelines de dados onde a segurança e a privacidade do cliente sejam recursos de primeira classe, não apenas uma reflexão tardia ou uma caixa a ser verificada.

Uma ferramenta de engenharia de dados que é popular entre os engenheiros e clientes da Gretel é o Apache Airflow. Também funciona muito bem com Gretel. Nesta postagem do blog, mostraremos como criar um pipeline de dados sintéticos usando Airflow, Gretel e PostgreSQL. Vamos pular!

O que é fluxo de ar

 
 
O fluxo de ar é uma ferramenta de automação de fluxo de trabalho comumente usada para construir pipelines de dados. Ele permite que engenheiros de dados ou cientistas de dados definam e implantem programaticamente esses pipelines usando Python e outras construções familiares. No centro do Airflow está o conceito de DAG, ou gráfico acíclico direcionado. Um Airflow DAG fornece um modelo e um conjunto de APIs para definir componentes de pipeline, suas dependências e ordem de execução.

Você pode encontrar pipelines do Airflow replicando dados de um banco de dados de produto em um data warehouse. Outros pipelines podem executar consultas que unem dados normalizados em um único conjunto de dados adequado para análise ou modelagem. Ainda outro pipeline pode publicar um relatório diário agregando as principais métricas de negócios. Um tema comum compartilhado entre esses casos de uso: coordenar a movimentação de dados entre sistemas. É aqui que o Airflow brilha.

Aproveitando o Airflow e seu rico ecossistema de integrações, engenheiros de dados e cientistas podem orquestrar qualquer número de ferramentas ou serviços diferentes em um único pipeline unificado que é fácil de manter e operar. Com uma compreensão desses recursos de integração, agora começaremos a falar sobre como o Gretel pode ser integrado a um pipeline do Airflow para melhorar os fluxos de trabalho comuns de operações de dados.

Como Gretel se encaixa?

 
 
Na Gretel, nossa missão é tornar os dados mais fáceis e seguros de se trabalhar. Falando com os clientes, um ponto problemático que ouvimos com frequência é o tempo e o esforço necessários para que os cientistas de dados tenham acesso a dados confidenciais. Usando Gretel Sintéticos, podemos reduzir o risco de trabalhar com dados confidenciais gerando uma cópia sintética do conjunto de dados. Ao integrar o Gretel ao Airflow, é possível criar pipelines de autoatendimento que facilitam para os cientistas de dados obterem rapidamente os dados de que precisam sem a necessidade de um engenheiro de dados para cada nova solicitação de dados.

Para demonstrar esses recursos, criaremos um pipeline de ETL que extrai recursos de atividade do usuário de um banco de dados, gera uma versão sintética do conjunto de dados e salva o conjunto de dados no S3. Com o conjunto de dados sintético salvo no S3, ele pode ser usado por cientistas de dados para modelagem ou análise downstream sem comprometer a privacidade do cliente.

Para começar, vamos primeiro ter uma visão geral do pipeline. Cada nó neste diagrama representa uma etapa do pipeline ou “tarefa” nos termos do Airflow.



Exemplo de pipeline de sintéticos Gretel no Airflow.

 

Podemos dividir o pipeline em 3 estágios, semelhantes ao que você pode encontrar em um pipeline ETL:

  • Extrair – A tarefa extract_features consultará um banco de dados e transformará os dados em um conjunto de recursos que podem ser usados ​​por cientistas de dados para construir modelos.
  • Sintetizar – generate_synthetic_features usará os recursos extraídos como entrada, treinará um modelo sintético e, em seguida, gerará um conjunto sintético de recursos usando APIs Gretel e serviços em nuvem.
  • Ver – upload_synthetic_features salva o conjunto sintético de recursos no S3, onde pode ser ingerido em qualquer modelo ou análise downstream.

Nas próximas seções, vamos mergulhar em cada uma dessas três etapas com mais detalhes. Se você deseja acompanhar cada amostra de código, você pode ir para gretelai/gretel-airflow-pipelines e baixe todo o código usado neste post do blog. O repositório também contém instruções que você pode seguir para iniciar uma instância do Airflow e executar o pipeline de ponta a ponta.

Além disso, pode ser útil visualizar o pipeline do Airflow em sua totalidade, antes de dissecar cada componente, dags/airbnb_user_bookings.py. Os trechos de código nas seções a seguir são extraídos do pipeline de reserva de usuário vinculado.

Extrair recursos

 
 
A primeira tarefa, extract_features, é responsável por extrair dados brutos do banco de dados de origem e transformá-los em um conjunto de recursos. Isso é comum engenharia de recursos problema que você pode encontrar em qualquer aprendizado de máquina ou pipeline de análise.

Em nosso pipeline de exemplo, provisionaremos um banco de dados PostgreSQL e o carregaremos com dados de reserva de um Competição Kaggle do Airbnb.

Este conjunto de dados contém duas tabelas, Usuários e Sessões. Sessions contém uma referência de chave estrangeira, user_id. Usando esse relacionamento, criaremos um conjunto de recursos contendo várias métricas de reserva agregadas por usuário. A figura a seguir representa a consulta SQL usada para construir os recursos.

WITH session_features_by_user AS ( SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)) AS max_session_time_seconds, round(min(secs_elapsed)) AS min_session_time_seconds, ( SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds FROM session_features_by_user s LEFT JOIN usuários u ON u.id = s.user_id LIMIT


A consulta SQL é executada em nosso pipeline do Airflow e gravada em um local S3 intermediário usando a seguinte definição de tarefa.

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" com NamedTemporaryFile (mode="r+", suffix=".csv") as tmp_csv: postgres.copy_expert( f"copiar ({sql_query}) para stdout com cabeçalho csv", tmp_csv.name ) s3.load_file( filename=tmp_csv.name, chave=chave, ) chave de retorno


A entrada para a tarefa, sql_file, determina qual consulta será executada no banco de dados. Essa consulta será lida na tarefa e, em seguida, executada no banco de dados. Os resultados da consulta serão gravados no S3 e a chave do arquivo remoto será retornada como uma saída da tarefa.

A captura de tela abaixo mostra um conjunto de resultados de amostra da consulta de extração acima. Descreveremos como criar uma versão sintética desse conjunto de dados na próxima seção.



Visualização do resultado da consulta.

Sintetize recursos usando APIs Gretel

 
 
Para gerar uma versão sintética de cada recurso, devemos primeiro treinar um modelo sintético e, em seguida, executar o modelo para gerar registros sintéticos. A Gretel tem um conjunto de SDKs Python que facilitam a integração nas tarefas do Airflow.

Além dos SDKs do cliente Python, criamos um Gancho de fluxo de ar Gretel que gerencia conexões e segredos da API Gretel. Depois de configurar uma conexão Gretel Airflow, conectar-se à API Gretel é tão fácil quanto

from hooks.gretel import GretelHook gretel = GretelHook() projeto = gretel.get_project()


Para obter mais informações sobre como configurar conexões do Airflow, consulte nosso repositório do Github README.

A variável de projeto no exemplo acima pode ser usada como o principal ponto de entrada para treinar e executar modelos sintéticos usando a API do Gretel. Para mais detalhes, você pode conferir nosso Documentos da API Python.

Voltando ao pipeline de reservas, agora revisaremos a tarefa generate_synthetic_features. Esta etapa é responsável por treinar o modelo sintético utilizando as características extraídas na tarefa anterior.

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj( model_config="synthetics/default", data_source=s3.download_file(data_source) ) model.submit_cloud() poll(model) return model.get_artifact_link("data_preview")


Observando a assinatura do método, você verá que ele usa um caminho, data_source. Esse valor aponta para os recursos do S3 extraídos na etapa anterior. Em uma seção posterior, veremos como todas essas entradas e saídas são conectadas.

Ao criar o modelo usando project.create_model_obj, o parâmetro model_config representa a configuração do modelo sintético usado para gerar o modelo. Neste pipeline, estamos usando nosso configuração do modelo padrão, mas muitos outros opções de configuração estão disponíveis.

Após a configuração do modelo, chamamos model.submit_cloud(). Isso enviará o modelo para treinamento e geração de registros usando o Gretel Cloud. Chamar poll(model) bloqueará a tarefa até que o modelo tenha concluído o treinamento.

Agora que o modelo foi treinado, usaremos get_artifact_link para retornar um link para baixar os recursos sintéticos gerados.



Visualização de dados do conjunto sintético de recursos.

 

Este link de artefato será usado como uma entrada para a etapa final upload_synthetic_features.

Carregar recursos sintéticos

 
 
Os recursos originais foram extraídos e uma versão sintética foi criada. Agora é hora de fazer o upload dos recursos sintéticos para que possam ser acessados ​​pelos consumidores downstream. Neste exemplo, usaremos um bucket do S3 como destino final do conjunto de dados.

@task() def upload_synthetic_features(data_set: str): context = get_current_context() com open(data_set, "rb") como synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


Esta tarefa é bastante simples. O valor de entrada data_set contém um link HTTP assinado para baixar o conjunto de dados sintético da API do Gretel. A tarefa lerá esse arquivo no trabalhador do Airflow e, em seguida, usará o gancho S3 já configurado para fazer upload do arquivo de recurso sintético para um bucket do S3 onde os consumidores ou modelos downstream possam acessá-lo.

Orquestrando o pipeline

 
 
Nas últimas três seções, percorremos todo o código necessário para extrair, sintetizar e carregar um conjunto de dados. A última etapa é vincular cada uma dessas tarefas em um único pipeline do Airflow.

Se você se lembrar do início deste post, mencionamos brevemente o conceito de um DAG. Usando a API TaskFlow do Airflow, podemos compor esses três métodos Python em um DAG que define as entradas, saídas e a ordem em que cada etapa será executada.

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql" ) synthetic_data = generate_synthetic_features(feature_path) upload_synthetic_features(synthetic_data)


Se você seguir o caminho dessas chamadas de método, eventualmente obterá um gráfico que se parece com nosso pipeline de recursos original.



Oleoduto sintético Gretel no Airflow.

 

Se você quiser executar esse pipeline e vê-lo em ação, vá para o que acompanha o repositório do Github. Lá você encontrará instruções sobre como iniciar uma instância do Airflow e executar o pipeline de ponta a ponta.

Embrulhando as coisas

 
 
Se você chegou até aqui, viu como o Gretel pode ser integrado a um pipeline de dados construído no Airflow. Ao combinar as APIs amigáveis ​​para desenvolvedores da Gretel e o poderoso sistema de ganchos e operadores da Airflow, é fácil construir pipelines ETL que tornam os dados mais acessíveis e seguros de usar.

Também falamos sobre um caso de uso comum de engenharia de recursos em que dados confidenciais podem não estar prontamente acessíveis. Ao gerar uma versão sintética do conjunto de dados, reduzimos o risco de expor quaisquer dados confidenciais, mas ainda retemos a utilidade do conjunto de dados ao mesmo tempo em que o disponibilizamos rapidamente para quem precisa.

Pensando no pipeline de recursos em termos mais abstratos, agora temos um padrão que pode ser reaproveitado para qualquer número de novas consultas SQL. Ao implantar uma nova versão do pipeline e trocar a consulta SQL inicial, podemos enfrentar qualquer consulta potencialmente sensível com um conjunto de dados sintético que preserva a privacidade do cliente. A única linha de código que precisa ser alterada é o caminho para o arquivo sql. Nenhuma engenharia de dados complexa é necessária.

Obrigado por ler

 
 
Envie-nos um e-mail para oi@gretel.ai ou venha participar conosco Slack se você tiver quaisquer perguntas ou comentários. Adoraríamos saber como você está usando o Airflow e como podemos nos integrar melhor aos seus pipelines de dados existentes.

 
Bio: Drew Newberry é engenheiro de software na Gretel.ai.

Óptimo estado. Original. Republicado com permissão.

Relacionado:

Fonte: https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

Carimbo de hora:

Mais de KDnuggetsGenericName