适用于 Apache Airflow 的 Amazon 托管工作流 (Amazon MWAA) 是一种托管编排服务,适用于 阿帕奇气流 这使得在云中大规模设置和操作端到端数据管道变得简单。 数据管道是一组任务和流程,用于在不同系统之间自动移动和转换数据。 Apache Airflow 开源社区为 Apache Airflow 提供了 1,000 多个预建运算符(简化服务连接的插件)来构建数据管道。 这 亚马逊供应商套餐 Apache Airflow 集成了超过 31 种 AWS 服务,例如 亚马逊简单存储服务 (Amazon S3), 亚马逊Redshift, 亚马逊电子病历, AWS胶水, 亚马逊SageMaker等等。
Airflow 最常见的用例是 ETL(提取、转换和加载)。 几乎所有 Airflow 用户都实现了从简单到复杂的 ETL 管道。 操作机器学习 (ML) 是另一个不断增长的用例,其中数据必须先进行转换和规范化,然后才能加载到 ML 模型中。 在这两种用例中,数据管道通过从不同来源摄取数据并通过一系列步骤对其进行转换来准备数据以供使用。
数据管道内不同进程的可观察性是监控管道成功或失败的关键组件。 虽然在数据管道内调度任务的运行是由 Airflow 控制的,但任务本身的运行(转换、规范化和聚合数据)是由基于用例的不同服务完成的。 由于数据管道中有多个接触点,因此拥有端到端的数据流视图是一项挑战。
在本文中,我们概述了使用 Amazon MWAA 时的日志记录增强功能,这是可观察性的支柱之一。 然后,我们讨论通过修改构成数据管道的任务定义来进一步增强端到端可观察性的解决方案。 在这篇文章中,我们重点关注两种服务的任务定义:AWS Glue 和 Amazon EMR,但是相同的方法可以应用于不同的服务。
挑战
许多客户的数据管道开始时很简单,编排一些任务,随着时间的推移变得越来越复杂,由大量任务和它们之间的依赖关系组成。 随着复杂性的增加,在发生故障时操作和调试变得越来越困难,这就需要单一管理平台来提供端到端的数据管道编排和健康管理。 对于数据管道编排, 阿帕奇气流用户界面 是一种用户友好的工具,可提供对数据管道的详细视图。 在管道健康管理方面,您的任务与之交互的每个服务都可能将日志存储或发布到不同的位置,例如 S3 存储桶或 亚马逊CloudWatch 日志。 随着集成接触点数量的增加,拼接由不同位置的不同服务生成的分布式日志可能具有挑战性。
Amazon MWAA 提供的一种解决方案,用于将 Airflow 和任务日志整合到 有向无环图(DAG) 是将日志转发到 CloudWatch 日志组. 为每个启用的 Airflow 日志记录选项创建一个单独的日志组(例如, DAGProcessing
, Scheduler, Task
, WebServer
及 Worker
). 这些日志可以跨日志组查询 使用 CloudWatch Logs Insights。
分布式追踪中常用的一种方式是使用关联ID来拼接和查询分布式日志。 关联 ID 是通过请求流传递的唯一标识符,用于在工作流的整个生命周期中跟踪一系列活动。 当工作流中的每个服务都需要记录信息时,它可以包含此关联 ID,从而确保您可以从头到尾跟踪完整的请求。
Airflow引擎通过了一些 变量 默认情况下,所有模板都可以访问。 运行ID 就是这样一个变量,它是 DAG 运行的唯一标识符。 这 run_id
可以用作关联 ID 来查询 CloudWatch 中的不同日志组,以捕获特定 DAG 运行的所有日志。
但是,请注意,您的任务与之交互的服务将使用单独的日志组,并且不会记录 run_id
作为他们输出的一部分。 这将阻止您获得整个 DAG 运行的端到端视图。
例如,如果您的数据管道包含作为管道一部分运行 Spark 作业的 AWS Glue 任务,则 Airflow 任务日志将在一个 CloudWatch 日志组中可用,而 AWS Glue 作业日志将在不同的 CloudWatch 日志组中. 但是,作为 AWS Glue 作业的一部分运行的 Spark 作业无权访问关联 ID,也无法绑定回特定的 DAG 运行。 因此,即使您使用关联 ID 查询不同的 CloudWatch 日志组,您也不会获得有关 Spark 作业运行的任何信息。
解决方案概述
如您所知, run_id
是一个变量,它是 DAG 运行的唯一标识符。 这 run_id
作为 Airflow 任务日志的一部分存在。 使用 run_id
有效地增加整个 DAG 运行的可观察性,我们使用 run_id
作为相关 ID 并将其传递给 DAG 的不同任务。 然后相关 ID 由任务中使用的脚本使用。
下图说明了解决方案体系结构。
我们关注的数据管道由以下组件组成:
- 包含源数据的 S3 存储桶
- An AWS Glue搜寻器 从源数据在数据目录中创建表元数据
- An AWS Glue作业 在执行文件格式转换的同时将原始数据转换为经过处理的数据格式
- An 电子病历作业 生成报告数据集
有关架构的详细信息以及如何运行 DAG 的完整步骤,请参阅 亚马逊 MWAA 分析研讨会.
在接下来的部分中,我们将探讨以下主题:
- DAG 文件,以了解如何在 AWS Glue 和 EMR 任务中定义并传递关联 ID
- Python 脚本中根据关联 ID 输出信息所需的代码
参考 GitHub回购 详细的 DAG 定义和 Spark 脚本。 要运行脚本,请参阅 亚马逊 MWAA 分析研讨会.
DAG 定义
在本节中,我们将查看 DAG 文件所需添加内容的片段。 我们还讨论了如何将关联 ID 传递给 AWS Glue 和 EMR 作业。 请参阅 GitHub回购 完整的 DAG 代码。
DAG 文件首先定义变量:
#变量
接下来,让我们看看如何使用 AWS Glue 运算符将关联 ID 传递给 AWS Glue 作业。 运算符是 Airflow DAG 的构建块。 它们包含如何在数据管道中处理数据的逻辑。 DAG 中的每个任务都是通过实例化一个运算符来定义的。
Airflow 为不同的任务提供操作员。 对于这篇文章,我们使用 AWS Glue 运算符.
AWS Glue 任务定义包含以下内容:
- Python Spark 作业脚本 (raw_to_transform.py) 运行作业
- 作为参数传递的 DAG 名称、任务 ID 和关联 ID
- AWS Glue 服务角色 已分配,具有运行爬虫和作业的权限
请参见以下代码:
# 胶水任务定义
接下来,我们使用 电子病历操作员. 这包括以下步骤:
- 定义 EMR 集群的配置。
- 创建 EMR 集群。
- 定义 EMR 作业要运行的步骤。
- 运行 EMR 作业:
- 我们使用 Python Spark 作业脚本 聚合.py.
- 我们将 DAG 名称、任务 ID 和关联 ID 作为参数传递给 EMR 任务的步骤。
让我们从定义 EMR 集群的配置开始。 这 correlation_id
在集群的名称中传递,以便轻松识别与 DAG 运行对应的集群。 EMR 作业生成的日志发布到 S3 存储桶; 这 correlation_id
是的一部分 LogUri
以及。 请参见以下代码:
#定义EMR集群配置
现在让我们根据配置定义创建 EMR 集群的任务:
# 创建 EMR 集群
cluster_creator = EmrCreateJobFlowOperator( task_id= emr_task_id, job_flow_overrides=JOB_FLOW_OVERRIDES, aws_conn_id=’aws_default’, emr_conn_id=’emr_default’, dag=dag
)
接下来,让我们定义作为 EMR 作业的一部分运行所需的步骤。 EMR 作业处理的输入和输出数据存储在作为参数传递的 S3 存储桶中。 Dag_name
, task_id
及 correlation_id
也作为参数传入。 使用的 task_id 可以是您选择的名称; 在这里我们使用 add_steps
:
# EMR 集群要执行的 EMR 步骤
接下来,让我们添加一个任务来在 EMR 集群上运行这些步骤。 这 job_flow_id
是 JobFlow
, 这是从 EMR 传下来的 create task
前面描述过使用 气流XComs。 请参见以下代码:
#运行 EMR 作业
这样就完成了在 DAG 任务定义中传递关联 ID 所需的步骤。
在下一节中,我们将在脚本运行中使用此 ID 来记录详细信息。
作业脚本定义
在本节中,我们将根据以下内容审查记录信息所需的更改 correlation_id
. 让我们从 AWS Glue 作业脚本开始(完整代码参考下面 文件 在 GitHub 中):
# 脚本更改文件 'raw_to_transform'
接下来,我们重点关注 EMR 作业脚本(完整代码请参考 文件 在 GitHub 中):
# 脚本更改文件 'nyc_aggregations'
这样就完成了将关联 ID 传递给脚本运行的步骤。
在我们完成 DAG 定义和脚本添加之后,我们就可以运行 DAG。 可以使用关联 ID 查询特定 DAG 运行的日志。 可以通过以下方式找到 DAG 运行的相关 ID 气流用户界面. 关联 ID 的示例是 manual__2022-07-12T00:22:36.111190+00:00
. 有了这个唯一的字符串,我们就可以使用 CloudWatch Logs Insights 对相关的 CloudWatch 日志组运行查询。 查询结果包括 AWS Glue 和 EMR 脚本提供的日志记录,以及与关联 ID 关联的其他日志。
DAG 级别日志的示例查询: manual__2022-07-12T00:22:36.111190+00:00
我们还可以使用格式获取任务级别的日志 <dag_name.task_id correlation_id>
:
示例查询: data_pipeline.glue_task manual__2022-07-12T00:22:36.111190+00:00
清理
如果您创建了使用以下命令运行和测试脚本的设置 亚马逊 MWAA 分析研讨会,执行 净化 避免产生费用的步骤。
结论
在本文中,我们展示了如何将 Amazon MWAA 日志发送到 CloudWatch 日志组。 然后,我们讨论了如何使用唯一的关联 ID 将来自不同任务的日志绑定到 DAG 中。 关联 ID 可以输出您的作业所需的尽可能多或尽可能少的信息,以在整个 DAG 运行中提供更多详细信息。 然后,您可以使用 CloudWatch Logs Insights 查询日志。
借助此解决方案,您可以将 Amazon MWAA 用作数据管道编排的单一管理平台,并将 CloudWatch 日志用作数据管道健康管理。 唯一标识符提高了 DAG 运行的端到端可观察性,并有助于减少故障排除所需的时间。
要了解更多信息并获得实践经验,请从 亚马逊 MWAA 分析研讨会 然后使用脚本中的 GitHub回购 获得更多 DAG 运行的可观察性。
关于作者
辛格 是 Amazon Web Services 的合作伙伴解决方案架构师,专注于无服务器平台。 她负责帮助合作伙伴和客户实现应用程序现代化并将其迁移到 AWS。
- SEO 支持的内容和 PR 分发。 今天得到放大。
- 柏拉图区块链。 Web3 元宇宙智能。 知识放大。 访问这里。
- Sumber: https://aws.amazon.com/blogs/big-data/improve-observability-across-amazon-mwaa-tasks/
- 000
- 1
- 10
- 100
- 11
- a
- 关于
- ACCESS
- 无障碍
- 横过
- 活动
- 无环
- 增加
- 驳
- 所有类型
- 尽管
- Amazon
- 亚马逊网络服务
- 分析
- 和
- 另一个
- 阿帕奇
- 应用领域
- 应用的
- 的途径
- 架构
- 参数
- 分配
- 相关
- 自动化
- 可使用
- 避免
- AWS
- AWS胶水
- 背部
- 基于
- 成为
- before
- 之间
- 吹氣梢
- 建立
- 建设者
- 建筑物
- 捕获
- 案件
- 例
- 检索目录
- 挑战
- 挑战
- 更改
- 收费
- 选择
- 云端技术
- 簇
- 码
- 相当常见
- 社体的一部分
- 完成
- 完成对
- 复杂
- 复杂
- 元件
- 组件
- 配置
- 连接
- 组成
- 巩固
- 消费
- 消费
- 包含
- 受控
- 核心
- 相关
- 相应
- 可以
- 履带
- 创建信息图
- 创建
- 创建
- 合作伙伴
- DAG
- data
- 默认
- 定义
- 定义
- 描述
- 详细
- 详情
- 不同
- 讨论
- 讨论
- 分布
- 不会
- 向下
- 每
- 此前
- 容易
- 只
- 启用
- 端至端
- 发动机
- 保证
- 整个
- 醚(ETH)
- 甚至
- 例子
- 体验
- 探索
- 提取
- 失败
- 少数
- 文件
- 完
- 流
- 专注焦点
- 重点
- 以下
- 格式
- 向前
- 发现
- 止
- ,
- 功能
- 进一步
- Gain增益
- 产生
- 产生
- 得到
- 越来越
- GitHub上
- 玻璃
- 图形
- 团队
- 组的
- 增长
- 成长
- 动手
- 硬
- 有
- 健康管理
- 帮助
- 帮助
- 此处
- 创新中心
- How To
- 但是
- HTML
- HTTPS
- 识别码
- 鉴定
- 实施
- 进口
- 改善
- 提高
- in
- 包括
- 包括
- 增加
- 增加
- 日益
- 信息
- 输入
- 可行的洞见
- 积分
- 集成
- 互动
- IT
- 本身
- 工作
- 工作机会
- 键
- 知道
- 大
- 学习用品
- 学习
- Level
- 一生
- 小
- 加载
- 地点
- 日志4j
- 看
- 机
- 机器学习
- 使
- 制作
- 管理
- 颠覆性技术
- 市场
- 主
- 元数据
- 方法
- 迁移
- ML
- 模型
- 模型
- 现代化
- 显示器
- 更多
- 最先进的
- 运动
- 多
- 姓名
- 几乎
- 需求
- 打印车票
- 需要
- 下页
- 节点
- 数
- 获得
- 一
- 开放源码
- 操作
- 操作者
- 运营商
- 附加选项
- 管弦乐编曲
- 秩序
- 其他名称
- 简介
- 面包
- 部分
- 特别
- 合伙人
- 通过
- 通行证
- 通过
- 演出
- 执行
- 权限
- 管道
- 平台
- 柏拉图
- 柏拉图数据智能
- 柏拉图数据
- 插件
- 点
- 帖子
- 准备
- 当下
- 防止
- 过程
- 提供
- 提供
- 提供者
- 供应商
- 提供
- 出版
- 出版
- 蟒蛇
- 范围
- 原
- 原始数据
- 减少
- 相应
- 报告
- 请求
- 必须
- 提供品牌战略规划
- 导致
- 检讨
- 运行
- 运行
- 同
- SC
- 鳞片
- 脚本
- 部分
- 部分
- 分开
- 序列
- 系列
- 无服务器
- 服务
- 特色服务
- 会议
- 集
- 格局
- 简易
- 简化
- 单
- So
- 方案,
- 解决方案
- 来源
- 来源
- 火花
- SQL
- 开始
- 开始
- 步骤
- 存储
- 存储
- 成功
- 这样
- 产品
- 表
- 任务
- 任务
- 模板
- test
- 其
- 从而
- 通过
- 始终
- 领带
- 绑
- 次
- 至
- 工具
- Topics
- 触摸
- 追踪
- 跟踪时
- 跟踪
- 改造
- 转型
- 转化
- 转型
- true
- 理解
- 独特
- 用法
- 使用
- 用例
- 用户友好
- 用户
- 各个
- 通过
- 查看
- 意见
- 卷筒纸
- Web服务
- 这
- 而
- 将
- 中
- 工作流程
- 工作流程
- 加工
- 工作坊
- 您一站式解决方案
- 和风网