什么是ETL?
ETL 是从多个源系统提取数据、更改数据(通过计算、串联等),然后将其放入数据仓库系统的过程。 ETL 代表提取、转换和加载。
人们很容易相信构建数据仓库就像从众多来源提取数据并将其输入数据仓库数据库一样简单。 事实并非如此,需要复杂的ETL过程。 ETL 过程在技术上很复杂,需要各种利益相关者的积极参与,包括开发人员、分析师、测试人员和高级管理人员。
为了保持其作为决策工具的价值,数据仓库系统必须与业务发展同步发展。 ETL 是数据仓库系统的定期(每日、每周、每月)流程,必须敏捷、自动化且正确记录。
ETL 是如何工作的?
下面我们将逐步了解 ETL 流程是如何工作的:
步骤1)提取
数据从源系统中提取,并在提取期间放置在暂存区域中。 如果需要任何转换,它们将在暂存区域中执行,以便源系统的性能不会受到损害。 如果损坏的数据直接从源转移到数据仓库数据库中,回滚将会很困难。 在将提取的数据移动到数据仓库之前,可以在暂存区域对其进行验证。
数据仓库可以将具有不同硬件、数据库管理系统、操作系统和通信协议的系统组合起来。 数据仓库必须将具有不同 DBMS、硬件、操作系统和通信协议的系统结合起来。 来源可能包括遗留程序(例如大型机)、定制应用程序、接触点设备(例如 ATM 和呼叫交换机)、文本文件、电子表格、ERP、来自供应商和合作伙伴的数据等。
因此,在提取数据并物理加载之前,需要一个逻辑数据映射。 此数据图中显示了源数据和目标数据之间的连接。
三种数据提取方法:
- 部分提取 – 如果源系统在记录被修改时提醒您,这是获取数据的最简单方法。
- 部分提取(无更新通知) – 并非所有系统都能在发生更新时发出通知; 但是,它们可以指示已更改的记录并提供这些记录的提取。
- 完整摘录 – 某些系统根本无法确定哪些数据已更改。 在这种情况下,从系统中获取数据的唯一方法是执行完整提取。 此方法需要对之前的摘录进行备份
现有相同的格式,以便识别已完成的更改。
无论采用哪种方法,提取都不应该对源系统的性能或响应时间产生影响。 这些是实时生产数据库。 任何放缓或锁定都可能对公司的利润产生影响。
步骤2)转型
从源服务器检索到的数据是原始数据,在其原始状态下无法使用。 因此,必须对其进行清理、映射和改造。 实际上,这是 ETL 流程增加价值并转换数据以生成有意义的 BI 报告的关键步骤。
这是一个关键的 ETL 概念,您可以在其中将函数集合应用于提取的数据。 直接移动 or 通过数据a 是不需要任何转换的数据类型。
您可以在转换步骤中对数据执行自定义操作。 例如,假设客户想要数据库中不存在的销售收入总和。 或者如果表中的名字和姓氏位于不同的列中。 在加载之前,可以将它们连接起来。
以下是数据完整性问题的一些示例:
- 同一个人的不同拼写,例如 Prashant、Parshant 等。
- 公司名称的表示方法有很多种,例如Google、Google Inc.
- 使用了各种名称,例如克利夫兰和克利夫兰。
- 同一客户的不同应用程序可能会生成多个帐号。
- 一些需要数据的文件留空。
步骤3)加载
ETL 过程的最后阶段是将数据加载到目标数据仓库数据库中。 在典型的数据仓库中,大量数据会在相对较短的时间内加载。 因此,应该优化加载过程以提高性能。
在发生加载故障时,应制定恢复程序,以便可以从故障点重新启动操作,而不会影响数据完整性。 数据仓库管理员必须根据服务器性能监视、继续和停止负载。
装载类型:
- 初始负载 — 填充所有
数据仓库表 - 增量负载 — 持续实施
根据需要定期修改 - 完全刷新 — 清除内容
一个或多个表并用新数据重新加载它们
负载验证
- 检查关键字段数据是否缺失或为空。
- 应测试基于目标表的建模视图。
- 检查组合值3 和计算的测量值。
- 维度和历史表中的数据检查。
- 检查有关加载的事实和维度表的 BI 报告。
使用 PythonScript 设置 ETL
因此,您必须执行从多个数据库到数据仓库的基本提取转换加载 (ETL),以便为商业智能进行数据聚合。 您认为有多个可用的 ETL 包对于您的基本用例而言过多。
在本文中,我将向您展示如何从 MySQL、SQL-server 和 firebird 中提取数据。 使用Python 3.6,转换数据并将其加载到SQL-server(数据仓库)中。
首先,我们必须为我们的项目创建一个目录:
python_etl |__main.py |__db_credentials.py |__variables.py |__sql_queries.py |__etl.py
要使用 Python 设置 ETL,您需要在项目目录中生成以下文件.
- db_credentials.py: 应该拥有连接到所有数据库所需的所有信息。 例如数据库密码、端口号等。
- sql_queries.py: 用于提取和加载字符串格式数据的所有常用数据库查询都应该可用。
- etl.py: 连接到数据库并通过执行所有必要的过程来执行所需的查询。
- 主要.py: 负责管理操作流程并按指定顺序执行基本操作。
在 sql_queries.py 的这一部分中,这是我们将存储所有 sql 查询的地方,以便从源数据库中提取并导入到目标数据库(数据仓库)中
设置数据库凭据和变量
在variables.py中,创建一个变量来记录base的名称。
数据仓库名称 = '您的数据仓库名称'
在 db_credentials.py 中配置所有源和目标数据库连接字符串和凭据,如下所示。 将配置保存为列表,以便我们以后可以在需要时通过多个数据库迭代它。
from 变量 import datawarehouse_name datawarehouse_name = 'your_datawarehouse_name' # sql-server (目标数据库,数据仓库) datawarehouse_db_config = { 'Trusted_Connection': 'yes', 'driver': '{SQL Server}', 'server': 'datawarehouse_sql_server', ' database': '{}'.format(datawarehouse_name), 'user': 'your_db_username', 'password': 'your_db_password', 'autocommit': True, } # sql-server (源数据库) sqlserver_db_config = [ { 'Trusted_Connection ': '是', '驱动程序': '{SQL Server}', '服务器': 'your_sql_server', '数据库': 'db1', '用户': 'your_db_username', '密码': 'your_db_password', ' autocommit': True, } ] # mysql (源数据库) mysql_db_config = [ { 'user': 'your_user_1', 'password': 'your_password_1', 'host': 'db_connection_string_1', 'database': 'db_1', } , { 'user': 'your_user_2', 'password': 'your_password_2', 'host': 'db_connection_string_2', 'database': 'db_2', }, ] # firebird (源数据库) fdb_db_config = [ { 'dsn' : "/your/path/to/source.db", 'user': "your_username", 'password': "your_password", } ]
SQL 查询
在 sql_queries.py 的这一部分中,我们将存储所有 sql 查询,以便从源数据库中提取并导入到目标数据库(数据仓库)中。
我们必须为每个数据库实现不同的语法,因为我们正在使用多个数据平台。 我们可以通过根据数据库类型分离查询来做到这一点。
# 示例查询在不同的数据库平台上会有所不同 firebird_extract = (''' SELECT fbd_column_1, fbd_column_2, fbd_column_3 FROM fbd_table; ''') firebird_insert = (''' INSERT INTO table (column_1, column_2, column_3) VALUES (?, ?, ?) ''') firebird_extract_2 = (''' 从 fbd_table_1 中选择 fbd_column_2、fbd_column_3、fbd_column_2; ''') firebird_insert_2 = (''' 插入 table_2 (column_1、column_2、column_3) 值 (?, ?, ? ) ''') sqlserver_extract = (''' 从 sqlserver_table ''' 中选择 sqlserver_column_1、sqlserver_column_2、sqlserver_column_3 ''') sqlserver_insert = (''' INSERT INTO 表 (column_1、column_2、column_3) VALUES (?, ?, ?) ''' ) mysql_extract = (''' SELECT mysql_column_1, mysql_column_2, mysql_column_3 FROM mysql_table ''') mysql_insert = (''' INSERT INTO table (column_1, column_2, column_3) VALUES (?, ?, ?) ''') # 导出查询class SqlQuery: def __init__(self, extract_query, load_query): self.extract_query = extract_query self.load_query = load_query # 为 SqlQuery 创建实例 class fbd_query = SqlQuery(firebird_extract, firebird_insert) fbd_query_2 = SqlQuery(firebird_extract_2, firebird_insert_2) sqlserver_query = SqlQuery(sqlserver_extract) , sqlserver_insert) mysql_query = SqlQuery(mysql_extract, mysql_insert) # 存储为迭代列表 fbd_queries = [fbdquery, fbd_query_2] sqlserver_queries = [sqlserver_query] mysql_queries = [mysql_query]
提取 变换 加载
要使用 Python 为上述数据源设置 ETL,您需要以下模块:
# python 模块 import mysql.connector import pyodbc import fdb # 来自变量的变量 import datawarehouse_name
我们可以使用两种技术:etl() 和 etl_process()。
etl_process()是建立数据库源连接并根据数据库平台调用etl()方法的过程。
在第二种方法,即etl()方法中,它首先运行提取查询,然后将SQL数据存储在变量data中,并将其插入到目标数据库,即我们的数据仓库中。 数据转换可以通过改变类型元组的数据变量来完成。
def etl(query, source_cnx, target_cnx): # 从源数据库中提取数据 source_cursor = source_cnx.cursor() source_cursor.execute(query.extract_query) data = source_cursor.fetchall() source_cursor.close() # 将数据加载到仓库数据库 if data: target_cursor = target_cnx.cursor() target_cursor.execute("USE {}".format(datawarehouse_name)) target_cursor.executemany(query.load_query, data) print('数据加载到仓库db') target_cursor.close() else : print('data isempty') def etl_process(queries, target_cnx, source_db_config, db_platform): # 建立源数据库连接 if db_platform == 'mysql': source_cnx = mysql.connector.connect(**source_db_config) elif db_platform == 'sqlserver': source_cnx = pyodbc.connect(**source_db_config) elif db_platform == 'firebird': source_cnx = fdb.connect(**source_db_config) else: return '错误! 无法识别的数据库平台' # 循环执行sql查询 for query in requests: etl(query, source_cnx, target_cnx) # 关闭源数据库连接 source_cnx.close()
把所有东西放在一起
现在,在下一步中,我们可以循环 main.py 中的所有凭据并对所有数据库执行 etl。
为此,我们必须导入所有必需的变量和方法:
# 变量 from db_credentials import datawarehouse_db_config, sqlserver_db_config, mysql_db_config, fbd_db_config from sql_queries import fbd_queries, sqlserver_queries, mysql_queries from variables import * # 方法 from etl import etl_process
此文件中的代码负责迭代凭据,以便连接到数据库并使用 Python 操作执行必要的 ETL。
def main(): print('starting etl') # 为目标数据库(sql-server)建立连接 target_cnx = pyodbc.connect(**datawarehouse_db_config) # 循环访问凭证 # mysql_db_config 中的 mysql 配置: try: print("loading db: " + config['database']) etl_process(mysql_queries, target_cnx, config, 'mysql') except 异常为错误: print("etl for {} has error".format(config['database'])) print ('错误消息: {}'.format(error)) continue # sql-server for config in sqlserver_db_config: try: print("loading db: " + config['database']) etl_process(sqlserver_queries, target_cnx, config, ' sqlserver') except Exception as error: print("etl for {} has error".format(config['database'])) print('错误消息: {}'.format(error)) continue # firebird for config in fbd_db_config: try: print("loading db: " + config['database']) etl_process(fbd_queries, target_cnx, config, 'firebird') except Exception as error: print("etl for {} has error".format(config ['数据库'])) print('错误消息: {}'.format(error)) continue target_cnx.close() if __name__ == "__main__": main()
在终端中,输入 python main.py,您刚刚使用纯 python 脚本创建了一个 ETL。
ETL工具
市场上有多种数据仓库工具。 以下是一些最著名的例子:
1. 马克逻辑:
MarkLogic 是一个数据仓库系统,它使用一系列业务功能来使数据集成变得更容易、更快捷。 它可以查询多种类型的数据,例如文档、关系和元数据。
https://www.marklogic.com/product/getting-started/
2. 甲骨文:
Oracle 是业界最受欢迎的数据库。 它为本地和云服务提供多种数据仓库解决方案。 它通过提高运营效率来帮助改善客户体验。
https://www.oracle.com/index.html
3.亚马逊红移:
Redshift 是 Amazon 的数据仓库解决方案。 它是一种简单且经济高效的解决方案,可使用标准 SQL 和现有商业智能工具分析各种类型的数据。 它还支持对 PB 级结构化数据执行复杂查询。
https://aws.amazon.com/redshift/?nc2=h_m1
结论
本文让您深入了解 ETL 是什么,以及如何在 Python 中设置 ETL 的分步教程。 它还为您提供了当今大多数组织用来构建 ETL 数据管道的最佳工具列表。
另一方面,当今大多数组织都拥有大量具有高度动态结构的数据。 从头开始为此类数据创建 ETL 管道是一个困难的过程,因为组织必须使用大量资源来创建此管道,然后确保它能够跟上高数据量和架构更改。
关于作者
普拉山特·夏尔马
目前,我正在韦洛尔理工学院攻读技术学士学位(B.Tech)。 我对编程及其实际应用非常热衷,包括软件开发、机器学习、深度学习和数据科学。
我希望你喜欢这篇文章。 如果您想与我联系,可以通过以下方式联系:
或有任何其他疑问,您可以 发邮件 对我也
相关
- '
- "
- 账号管理
- 要积极。
- 所有类型
- Amazon
- 分析
- 应用领域
- 国家 / 地区
- 刊文
- 自动化
- 备份工具
- 提高
- 建立
- 建筑物
- 商业
- 商业智能
- 呼叫
- 支票
- 克利夫兰
- 云端技术
- 云服务
- 码
- 沟通
- 公司
- 地都
- 继续
- 创造
- 资历
- data
- 数据集成
- 数据科学
- 数据仓库
- 数据仓库
- 数据库
- 数据库
- 深入学习
- 开发
- 开发
- 研发支持
- 设备
- 尺寸
- 文件
- 司机
- 效率
- 等
- 执行
- 管理人员
- 体验
- 萃取
- 提取物
- 失败
- 姓氏:
- 流
- 格式
- 新鲜
- ,
- 谷歌
- 指南
- 硬件
- 点击此处
- 高
- 历史
- 创新中心
- How To
- HTTPS
- 鉴定
- 影响力故事
- 输入
- 包含
- 信息
- 积分
- 房源搜索
- 问题
- IT
- 键
- 大
- 学习用品
- 学习
- Line
- 清单
- 加载
- 机器学习
- 颠覆性技术
- 地图
- 市场
- 媒体
- 最受欢迎的产品
- 移动
- 名称
- 通知
- 数字
- 优惠精选
- 操作
- 操作系统
- 运营
- 神谕
- 秩序
- 组织
- 其他名称
- 伙伴
- 密码
- 性能
- 平台
- 平台
- 热门
- 生成
- 生产
- 代码编程
- 训练课程
- 项目
- 拉
- 蟒蛇
- 原
- 实时的
- 现实
- 记录
- 恢复
- 关系
- 业务报告
- 资源
- 响应
- 收入
- 科学
- 特色服务
- 集
- 短
- 简易
- So
- 软件
- 软件开发
- 解决方案
- SQL
- 阶段
- 州/领地
- 商店
- 商店
- 系统
- 产品
- 目标
- 科技
- 技术
- 专业技术
- 次
- 转型
- 教程
- 更新
- 折扣值
- 厂商
- 体积
- 仓库保管
- 仓储服务
- 每周
- 工作
- 合作