使用 Amazon Redshift SQLAlchemy 方言与 Amazon Redshift 交互

源节点: 1570489

亚马逊Redshift 是一个快速、可扩展、安全且完全托管的云数据仓库,使您能够大规模分析数据。 您可以通过多种不同方式与 Amazon Redshift 数据库进行交互。 一种方法是使用对象关系映射 (ORM) 框架。 ORM 被开发人员广泛用作数据库的抽象层,它允许您使用首选的编程语言编写代码,而不是编写 SQL。 SQLAlchemy的 是一个流行的 Python ORM 框架,它支持 Python 代码和数据库之间的交互。

SQLAlchemy 方言是用于与各种类型的 DBAPI 实现和数据库进行通信的系统。 以前,Amazon Redshift 的 SQLAlchemy 方言使用 psycopg2 与数据库通信。 因为 psycopg2 是一个 Postgres 连接器,它不支持 Amazon Redshift 特定的功能,例如 AWS身份和访问管理 (IAM) 安全连接和 Amazon Redshift 特定数据类型(例如 SUPER 和 GEOMETRY)身份验证。 新的 Amazon Redshift SQLAlchemy 方言使用 Amazon Redshift Python 驱动程序(redshift_connector) 并让您安全地连接到您的 Amazon Redshift 数据库。 它原生支持 IAM 身份验证和单点登录 (SSO)。 它还支持 Amazon Redshift 特定的数据类型,例如 SUPER、GEOMETRY、TIMESTAMPTZ 和 TIMETZ。

在本文中,我们将讨论如何使用新的 Amazon Redshift SQLAlchemy 方言与 Amazon Redshift 数据库进行交互。 我们演示了如何使用 Okta 安全连接并执行各种 DDL 和 DML 操作。 因为新的 Amazon Redshift SQLAlchemy 方言使用 redshift_connector,此软件包的用户可以充分利用由 redshift_connector,例如通过 IAM 和身份提供商 (IdP) 插件进行身份验证。 此外,我们还展示了对 IPython 的支持 SqlMagic,这简化了直接从 Jupyter 笔记本运行交互式 SQL 查询。

先决条件

以下是这篇文章的先决条件:

开始使用 Amazon Redshift SQLAlchemy 方言

使用适用于 Python 的 Amazon Redshift SQLAlchemy 方言很容易上手。 您可以安装 sqlalchemy-redshift 使用 pip 的库。 为了证明这一点,我们从一个 Jupyter notebook 开始。 完成以下步骤:

  1. 创建一个笔记本实例 (对于这篇文章,我们称之为 redshift-sqlalchemy).
  2. 在Amazon SageMaker控制台上的 笔记本 在导航窗格中,选择 笔记本实例.
  3. 找到您创建的实例并选择 打开Jupyter.
  4. 打开您的笔记本实例并创建一个新的 conda_python3 Jupyter笔记本。
  5. 运行以下命令进行安装 sqlalchemy-redshiftredshift_connector:
pip install sqlalchemy-redshift
pip install redshift_connector


redshift_connector 提供了许多不同的连接选项,可帮助自定义您访问 Amazon Redshift 集群的方式。 有关详细信息,请参阅 连接参数.

连接到您的 Amazon Redshift 集群

在此步骤中,我们将向您展示如何使用两种不同的方法连接到您的 Amazon Redshift 集群:Okta SSO 联合以及使用您的数据库用户和密码的直接连接。

与 Okta SSO 联合连接

作为先决条件,请在 Okta 配置中设置您的 Amazon Redshift 应用程序。 有关详细信息,请参阅 将 Amazon Redshift 访问与 Okta 作为身份提供者联合.

为了建立与 Amazon Redshift 集群的连接,我们使用 create_engine 功能。 SQLAlchemy create_engine() 函数根据 URL 生成引擎对象。 这 sqlalchemy-redshift 包提供了一个自定义界面,用于创建符合 RFC-1738 标准的 URL,您可以使用该 URL 建立与 Amazon Redshift 集群的连接。

我们构建 SQLAlchemy URL,如以下代码所示。 URL.create() 适用于 SQLAlchemy 1.4 及以上版本。 使用 IAM 进行身份验证时,用户无需指定主机和端口。 为了使用 SSO 联合安全地连接 Amazon Redshift,我们在 URL 中使用 Okta 用户名和密码。

import sqlalchemy as sa
from sqlalchemy.engine.url import URL
from sqlalchemy import orm as sa_orm from sqlalchemy_redshift.dialect import TIMESTAMPTZ, TIMETZ # build the sqlalchemy URL. When authenticating using IAM, the host
# and port do not need to be specified by the user.
url = URL.create(
drivername='redshift+redshift_connector', # indicate redshift_connector driver and dialect will be used
database='dev', # Amazon Redshift database
username='johnd@example.com', # Okta username
password='<PWD>' # Okta password
) # a dictionary is used to store additional connection parameters
# that are specific to redshift_connector or cannot be URL encoded.
conn_params = { "iam": True, # must be enabled when authenticating via IAM "credentials_provider": "OktaCredentialsProvider", "idp_host": "<prefix>.okta.com", "app_id": "<appid>", "app_name": "amazon_aws_redshift", "region": "<region>", "cluster_identifier": "<clusterid>", "ssl_insecure": False, # ensures certificate verification occurs for idp_host
} engine = sa.create_engine(url, connect_args=conn_params)

使用 Amazon Redshift 数据库用户和密码连接

您可以使用您的数据库用户和密码连接到您的 Amazon Redshift 集群。 我们构建一个 URL 并使用 URL.create() 构造函数,如下代码所示:

import sqlalchemy as sa
from sqlalchemy.engine.url import URL # build the sqlalchemy URL
url = URL.create(
drivername='redshift+redshift_connector', # indicate redshift_connector driver and dialect will be used
host='<clusterid>.xxxxxx.<aws-region>.redshift.amazonaws.com', # Amazon Redshift host
port=5439, # Amazon Redshift port
database='dev', # Amazon Redshift database
username='awsuser', # Amazon Redshift username
password='<pwd>' # Amazon Redshift password
) engine = sa.create_engine(url) Next, we will create a session using the already established engine above. Session = sa_orm.sessionmaker()
Session.configure(bind=engine)
session = Session() # Define Session-based Metadata
metadata = sa.MetaData(bind=session.bind)

使用 Amazon Redshift 数据类型创建数据库表并插入数据

使用新的 Amazon Redshift SQLAlchemy 方言,您可以创建具有 Amazon Redshift 特定数据类型(例如 SUPER、GEOMETRY、TIMESTAMPTZ 和 TIMETZ)的表。

在此步骤中,您将创建一个包含 TIMESTAMPTZ、TIMETZ 和 SUPER 数据类型的表。

或者,您可以定义表的分布方式、排序键和压缩编码。 请参见以下代码:

import datetime
import uuid
import random table_name = 'product_clickstream_tz' RedshiftDBTable = sa.Table(
table_name,
metadata,
sa.Column('session_id', sa.VARCHAR(80)),
sa.Column('click_region', sa.VARCHAR(100), redshift_encode='lzo'),
sa.Column('product_id', sa.BIGINT),
sa.Column('click_datetime', TIMESTAMPTZ),
sa.Column('stream_time', TIMETZ),
sa.Column ('order_detail', SUPER),
redshift_diststyle='KEY',
redshift_distkey='session_id',
redshift_sortkey='click_datetime'
) # Drop the table if it already exists
if sa.inspect(engine).has_table(table_name):
RedshiftDBTable.drop(bind=engine) # Create the table (execute the "CREATE TABLE" SQL statement for "product_clickstream_tz")
RedshiftDBTable.create(bind=engine) In this step, you will populate the table by preparing the insert command. # create sample data set
# generate a UUID for this row
session_id = str(uuid.uuid1()) # create Region information
click_region = "US / New York" # create Product information
product_id = random.randint(1,100000) # create a datetime object with timezone
click_datetime = datetime.datetime(year=2021, month=10, day=20, hour=10, minute=12, second=40, tzinfo=datetime.timezone.utc) # create a time object with timezone
stream_time = datetime.time(hour=10, minute=14, second=56, tzinfo=datetime.timezone.utc) # create SUPER information
order_detail = '[{"o_orderstatus":"F","o_clerk":"Clerk#0000001991","o_lineitems":[{"l_returnflag":"R","l_tax":0.03,"l_quantity":4,"l_linestatus":"F"}]}]' # create the insert SQL statement
insert_data_row = RedshiftDBTable.insert().values(
session_id=session_id,
click_region=click_region,
product_id=product_id,
click_datetime=click_datetime,
stream_time=stream_time,
order_detail=order_detail
) # execute the insert SQL statement
session.execute(insert_data_row)
session.commit()

从表中查询和获取结果

SQLAlchemy ORM 生成的 SELECT 语句由查询对象构造。 您可以使用多种不同的方法,例如 all(), first(), count(), order_by()join(). 以下屏幕截图显示了如何从查询表中检索所有行。

将 IPython SqlMagic 与 Amazon Redshift SQLAlchemy 方言结合使用

Amazon Redshift SQLAlchemy 方言现在支持 SqlMagic. 要建立连接,您可以使用以下命令构建 SQLAlchemy URL redshift_connector 司机。 有关更多信息 SqlMagic 可以用 GitHub上.

在下一节中,我们将演示如何使用 SqlMagic. 确保您拥有 ipython-sql 安装包; 如果没有,请运行以下命令安装它:

pip install ipython-sql

连接到 Amazon Redshift 并查询数据

在此步骤中,您构建 SQLAlchemy URL 以连接到 Amazon Redshift 并运行示例 SQL 查询。 对于这个演示,我们已经在集群中预填充了 TPCH 数据 GitHub上。 请参见以下代码:

import sqlalchemy as sa
from sqlalchemy.engine.url import URL
from sqlalchemy.orm import Session
%reload_ext sql
%config SqlMagic.displaylimit = 25 connect_to_db = URL.create(
drivername='redshift+redshift_connector', host='cluster.xxxxxxxx.region.redshift.amazonaws.com', port=5439, database='dev', username='awsuser', password='xxxxxx' )
%sql $connect_to_db
%sql select current_user, version();

您可以使用表格格式查看数据 pandas.DataFrame() 方法。

如果你安装了 matplotlib,你可以使用结果集的 .plot(), .pie().bar() 快速绘图的方法。

清理

确保在使用完 SQLAlchemy 资源后关闭并清理它们。 SQLAlchemy 使用连接池来提供对 Amazon Redshift 集群的访问。 打开后,默认行为使这些连接保持打开状态。 如果没有正确清理,这可能会导致集群出现连接问题。 使用以下代码清理您的资源:

session.close() # If the connection was accessed directly, ensure it is invalidated
conn = engine.connect()
conn.invalidate() # Clean up the engine
engine.dispose()

总结

在本文中,我们讨论了新的 Amazon Redshift SQLAlchemy 方言。 我们演示了它如何让您使用 SSO 安全地连接到 Amazon Redshift 数据库以及使用 SQLAlchemy URL 直接连接。 我们还演示了 SQLAlchemy 如何在不显式转换的情况下支持 TIMESTAMPTZ、TIMETZ 和 SUPER 数据类型。 我们还展示了如何 redshift_connector 和方言支持 SqlMagic 使用 Jupyter notebooks,这使您能够对 Amazon Redshift 运行交互式查询。


作者简介

苏梅特乔希 是纽约的分析专家解决方案架构师。 他擅长构建大型数据仓库解决方案。 他在数据仓库和分析领域拥有超过 16 年的经验。

布鲁克·怀特 是 AWS 的一名软件开发工程师。 通过她在 Amazon Redshift 驱动程序方面的工作,她使客户能够充分利用他们的数据。 在加入 AWS 之前,她在旧金山湾区的一家初创公司构建了 ETL 管道和分析 API。

来源:https://aws.amazon.com/blogs/big-data/use-the-amazon-redshift-sqlalchemy-dialect-to-interact-with-amazon-redshift/

时间戳记:

更多来自 AWS