使用 Gretel 和 Apache Airflow 构建合成数据管道

源节点: 1068200

使用 Gretel 和 Apache Airflow 构建合成数据管道

在这篇博文中,我们构建了一个 ETL 管道,该管道使用 Gretel 的 Synthetic Data API 和 Apache Airflow 从 PostgreSQL 数据库生成合成数据。


By 德鲁·纽伯里, Gretel.ai 的软件工程师

使用 Gretel 和 Apache Airflow 构建合成数据管道

大家好,我叫 Drew,是 Gretel 的一名软件工程师。 我最近一直在考虑将 Gretel API 集成到现有工具中的模式,以便轻松构建数据管道,其中安全性和客户隐私是一流的功能,而不仅仅是事后的想法或检查框。

在 Gretel 工程师和客户中流行的一种数据工程工具是 Apache Airflow。 它也恰好适用于 Gretel。 在这篇博文中,我们将向您展示如何使用 Airflow、Gretel 和 PostgreSQL 构建合成数据管道。 让我们跳进去!

什么是气流

 
 
气流 是一种工作流自动化工具,通常用于构建数据管道。 它使数据工程师或数据科学家能够使用 Python 和其他熟悉的结构以编程方式定义和部署这些管道。 Airflow 的核心是 DAG 或有向无环图的概念。 Airflow DAG 提供了一个模型和一组 API,用于定义管道组件、它们的依赖关系和执行顺序。

您可能会发现 Airflow 管道将数据从产品数据库复制到数据仓库。 其他管道可能会执行将规范化数据连接到适合分析或建模的单个数据集的查询。 另一个管道可能会发布汇总关键业务指标的每日报告。 这些用例共有一个共同主题:协调跨系统的数据移动。 这就是气流闪耀的地方。

利用 Airflow 及其丰富的生态系统 集成,数据工程师和科学家可以将任意数量的不同工具或服务编排到一个易于维护和操作的统一管道中。 了解了这些集成功能后,我们现在开始讨论如何将 Gretel 集成到 Airflow 管道中以改进常见的数据操作工作流程。

格蕾特如何适应?

 
 
在 Gretel,我们的使命是让数据更容易、更安全地使用。 与客户交谈时,我们经常听到的一个痛点是让数据科学家访问敏感数据所需的时间和精力。 使用 格蕾特合成材料,我们可以通过生成数据集的合成副本来降低处理敏感数据的风险。 通过将 Gretel 与 Airflow 集成,可以创建自助式管道,使数据科学家能够轻松快速地获取所需的数据,而无需数据工程师来处理每个新的数据请求。

为了演示这些功能,我们将构建一个 ETL 管道,从数据库中提取用户活动特征,生成数据集的合成版本,并将数据集保存到 S3。 通过保存在 S3 中的合成数据集,数据科学家可以在不损害客户隐私的情况下将其用于下游建模或分析。

首先,让我们先鸟瞰一下管道。 此图中的每个节点都代表一个管道步骤,或 Airflow 术语中的“任务”。



Airflow 上的 Gretel 合成管道示例。

 

我们可以将管道分为 3 个阶段,类似于您在 ETL 管道中可能会发现的内容:

  • 提取 – extract_features 任务将查询数据库,并将数据转换为一组可供数据科学家用于构建模型的特征。
  • 合成 – generate_synthetic_features 将提取的特征作为输入,训练合成模型,然后使用 Gretel API 和云服务生成一组合成特征。
  • 加载 – upload_synthetic_features 将合成的特征集保存到 S3 中,可以将其摄取到任何下游模型或分析中。

在接下来的几节中,我们将更详细地介绍这三个步骤中的每一个。 如果您希望跟随每个代码示例,您可以前往 gretelai/gretel-气流管道 并下载此博文中使用的所有代码。 该 repo 还包含您可以遵循的说明来启动 Airflow 实例并端到端运行管道。

此外,在我们剖析每个组件之前,完整地查看 Airflow 管道可能会有所帮助, dags/airbnb_user_bookings.py. 以下部分中的代码片段是从链接的用户预订管道中提取的。

提取特征

 
 
第一个任务 extract_features 负责从源数据库中提取原始数据并将其转换为一组特征。 这是一个常见的 特征工程 您可能会在任何机器学习或分析管道中发现问题。

在我们的示例管道中,我们将提供一个 PostgreSQL 数据库并使用来自 Airbnb Kaggle 竞赛.

该数据集包含两个表,用户和会话。 Sessions 包含一个外键引用,user_id。 使用这种关系,我们将创建一组包含按用户聚合的各种预订指标的特征。 下图表示用于构建功能的 SQL 查询。

WITH session_features_by_user AS (SELECT user_id, count(*) AS number_of_actions_taken, count(DISTINCT action_type) AS number_of_unique_actions, round(avg(secs_elapsed)) AS avg_session_time_seconds, round(max(secs_elapsed)) AS max_session_time_seconds, round(min(secs_elapsed)) AS min_session_time_seconds, (SELECT count(*) FROM sessions s WHERE s.user_id = user_id AND s.action_type = 'booking_request') AS total_bookings FROM sessions GROUP BY user_id ) SELECT u.id AS user_id, u.gender, u.age, u .language, u.signup_method, u.date_account_created, s.number_of_actions_taken, s.number_of_unique_actions, s.avg_session_time_seconds, s.min_session_time_seconds, s.max_session_time_seconds FROM session_features_by_user s LEFT JOIN users u ON u.id = s.user_id LIMIT 5000


然后从我们的 Airflow 管道执行 SQL 查询,并使用以下任务定义将其写入中间 S3 位置。

@task() def extract_features(sql_file: str) -> str: context = get_current_context() sql_query = Path(sql_file).read_text() key = f"{context['dag_run'].run_id}_booking_features.csv" with NamedTemporaryFile (mode="r+", suffix=".csv") as tmp_csv: postgres.copy_expert( f"copy ({sql_query}) to stdout with csv header", tmp_csv.name) s3.load_file(filename=tmp_csv.name, key=key, ) 返回键


任务的输入 sql_file 确定要在数据库上运行的查询。 该查询将被读入任务,然后针对数据库执行。 然后查询的结果将被写入 S3,远程文件密钥将作为任务的输出返回。

下面的屏幕截图显示了上面提取查询的示例结果集。 我们将在下一节中描述如何创建此数据集的合成版本。



查询结果预览。

使用 Gretel API 合成特征

 
 
要生成每个特征的合成版本,我们必须首先训练一个合成模型,然后运行该模型以生成合成记录。 Gretel 有一组 Python SDK,可以轻松集成到 Airflow 任务中。

除了 Python 客户端 SDK 之外,我们还创建了一个 Gretel 气流挂钩 管理 Gretel API 连接和秘密。 设置 Gretel 气流连接后,连接到 Gretel API 就像

从 hooks.gretel 导入 GretelHook gretel = GretelHook() project = gretel.get_project()


有关如何配置 Airflow 连接的更多信息,请参阅我们的 Github 存储库 读我.

上面示例中的项目变量可以用作使用 Gretel 的 API 训练和运行合成模型的主要入口点。 有关更多详细信息,您可以查看我们的 Python API 文档.

回到预订管道,我们现在将查看 generate_synthetic_features 任务。 这一步负责使用上一个任务中提取的特征来训练合成模型。

@task() def generate_synthetic_features(data_source: str) -> str: project = gretel.get_project() model = project.create_model_obj(model_config="synthetics/default", data_source=s3.download_file(data_source)) model.submit_cloud()轮询(模型)返回模型.get_artifact_link(“data_preview”)


查看方法签名,您会看到它采用路径 data_source。 该值指向上一步中提取的 S3 特征。 在后面的部分中,我们将介绍所有这些输入和输出是如何连接在一起的。

使用 project.create_model_obj 创建模型时,model_config 参数表示用于生成模型的合成模型配置。 在这个管道中,我们使用我们的 默认模型配置, 但许多其他 配置选项 是可用的。

模型配置完成后,我们调用 model.submit_cloud()。 这将提交模型以使用 Gretel Cloud 进行训练和记录生成。 调用 poll(model) 将阻塞任务,直到模型完成训练。

现在模型已经训练好了,我们将使用 get_artifact_link 返回一个链接来下载生成的合成特征。



合成特征集的数据预览。

 

此工件链接将用作最终 upload_synthetic_features 步骤的输入。

加载综合特征

 
 
原始特征已被提取,并已创建合成版本。 现在是上传合成特征的时候了,以便下游消费者可以访问它们。 在此示例中,我们将使用 S3 存储桶作为数据集的最终目的地。

@task() def upload_synthetic_features(data_set: str): context = get_current_context() with open(data_set, "rb") as synth_features: s3.load_file_obj( file_obj=synth_features, key=f"{..._booking_features_synthetic.csv", )


这个任务非常简单。 data_set 输入值包含一个签名的 HTTP 链接,用于从 Gretel 的 API 下载合成数据集。 该任务将该文件读入 Airflow 工作线程,然后使用已配置的 S3 挂钩将合成特征文件上传到下游消费者或模型可以访问的 S3 存储桶。

编排管道

 
 
在过去的三个部分中,我们浏览了提取、合成和加载数据集所需的所有代码。 最后一步是将这些任务中的每一个绑定到一个 Airflow 管道中。

如果您回想起这篇文章的开头,我们简要提到了 DAG 的概念。 使用 Airflow 的 TaskFlow API,我们可以将这三个 Python 方法组合成一个 DAG,它定义了每个步骤的输入、输出和运行顺序。

特征路径=提取特征(“/opt/airflow/dags/sql/session_rollups__by_user.sql”)合成数据=生成合成特征(特征路径)上传合成特征(合成数据)


如果你按照这些方法调用的路径,你最终会得到一个看起来像我们原来的特征管道的图。



气流上的 Gretel 合成管道。

 

如果您想运行此管道并查看它的运行情况,请转到 随附的 Github 存储库. 在那里,您将找到有关如何启动 Airflow 实例并端到端运行管道的说明。

把事情包起来

 
 
如果您已经做到了这一点,那么您已经了解了如何将 Gretel 集成到基于 Airflow 的数据管道中。 通过将 Gretel 的开发人员友好 API 与 Airflow 强大的钩子和操作符系统相结合,可以轻松构建 ETL 管道,使数据更易于访问和使用更安全。

我们还讨论了一个常见的特征工程用例,其中敏感数据可能不容易访问。 通过生成数据集的合成版本,我们降低了暴露任何敏感数据的风险,但仍然保留了数据集的实用性,同时让需要它的人可以快速使用它。

用更抽象的术语来考虑特性管道,我们现在有了一个可以重新用于任意数量的新 SQL 查询的模式。 通过部署新版本的管道并替换初始 SQL 查询,我们可以使用保护客户隐私的合成数据集来处理任何可能敏感的查询。 唯一需要更改的代码行是 sql 文件的路径。 无需复杂的数据工程。

谢谢阅读

 
 
发送电子邮件至 嗨@gretel.ai 或加入我们 松弛 如果您有任何问题或意见。 我们很想听听您如何使用 Airflow,以及我们如何最好地与您现有的数据管道集成。

 
简介: 德鲁·纽伯里 是 Gretel.ai 的一名软件工程师。

原版。 经许可重新发布。

相关新闻:

来源:https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

时间戳记:

更多来自 掘金队