Amazon Redshift SQLAlchemy 方言を使用して Amazon Redshift と対話する

ソースノード: 1570489

Amazonレッドシフト は、データを大規模に分析できる、高速でスケーラブルで安全なフル マネージドのクラウド データ ウェアハウスです。 いくつかの異なる方法で Amazon Redshift データベースを操作できます。 XNUMX つの方法は、オブジェクト リレーショナル マッピング (ORM) フレームワークを使用することです。 ORM は、データベース上の抽象化レイヤーとして開発者によって広く使用されています。これにより、SQL を記述する代わりに、好みのプログラミング言語でコードを記述できます。 SQLAlchemy は、Python コードとデータベース間の相互作用を可能にする、人気のある Python ORM フレームワークです。

SQLAlchemy 方言は、さまざまな種類の DBAPI 実装およびデータベースと通信するために使用されるシステムです。 以前は、Amazon Redshift の SQLAlchemy ダイアレクトは、データベースとの通信に psycopg2 を使用していました。 psycopg2 は Postgres コネクタであるため、次のような Amazon Redshift 固有の機能をサポートしていません。 AWS IDおよびアクセス管理 安全な接続のための (IAM) 認証と、SUPER や GEOMETRY などの Amazon Redshift 固有のデータ型。 新しい Amazon Redshift SQLAlchemy ダイアレクトは、Amazon Redshift Python ドライバー (redshift_connector) であり、Amazon Redshift データベースに安全に接続できます。 IAM 認証とシングル サインオン (SSO) をネイティブにサポートします。 また、SUPER、GEOMETRY、TIMESTAMPTZ、TIMETZ などの Amazon Redshift 固有のデータ型もサポートしています。

この投稿では、新しい Amazon Redshift SQLAlchemy ダイアレクトを使用して Amazon Redshift データベースを操作する方法について説明します。 Okta を使用して安全に接続し、さまざまな DDL および DML 操作を実行する方法を示します。 新しい Amazon Redshift SQLAlchemy ダイアレクトが使用するため redshift_connector、このパッケージのユーザーは、によって提供される接続オプションを最大限に活用できます。 redshift_connector、IAM および ID プロバイダー (IdP) プラグインを介した認証など。 さらに、IPython のサポートもデモンストレーションします。 SqlMagicこれにより、Jupyter ノートブックから直接インタラクティブな SQL クエリを簡単に実行できます。

前提条件

この投稿の前提条件は次のとおりです。

Amazon Redshift SQLAlchemy 方言を使ってみる

Python 用の Amazon Redshift SQLAlchemy 方言を使い始めるのは簡単です。 をインストールできます。 sqlalchemy-redshift pipを使ったライブラリ。 これを実証するために、Jupyter ノートブックから始めます。 次の手順を完了します。

  1. ノートブックインスタンスを作成する (この投稿では、それを呼び出します redshift-sqlalchemy).
  2. Amazon SageMakerコンソールの、 ノート ナビゲーション ペインで、 ノートブックインスタンス.
  3. 作成したインスタンスを見つけて選択します Jupyterを開く.
  4. ノートブック インスタンスを開き、新しい conda_python3 Jupyterノートブック。
  5. 次のコマンドを実行してインストールします sqlalchemy-redshift および redshift_connector:
pip install sqlalchemy-redshift
pip install redshift_connector


redshift_connector は、Amazon Redshift クラスターへのアクセス方法をカスタマイズするのに役立つさまざまな接続オプションを提供します。 詳細については、次を参照してください。 接続パラメータ.

Amazon Redshift クラスターに接続する

このステップでは、XNUMX つの異なる方法 (Okta SSO フェデレーション、およびデータベース ユーザーとパスワードを使用した直接接続) を使用して Amazon Redshift クラスターに接続する方法を示します。

Okta SSO フェデレーションに接続する

前提条件として、Okta 設定で Amazon Redshift アプリケーションをセットアップします。 詳細については、次を参照してください。 ID プロバイダーとして Okta を使用して Amazon Redshift アクセスをフェデレーションする.

Amazon Redshift クラスターへの接続を確立するために、 create_engine 関数。 SQL錬金術 create_engine() 関数は、URL に基づいてエンジン オブジェクトを生成します。 の sqlalchemy-redshift パッケージは、Amazon Redshift クラスターへの接続を確立するために使用できる RFC-1738 準拠の URL を作成するためのカスタム インターフェイスを提供します。

次のコードに示すように、SQLAlchemy URL を作成します。 URL.create() SQLAlchemy バージョン 1.4 以降で使用できます。 IAM を使用して認証する場合、ユーザーがホストとポートを指定する必要はありません。 SSO フェデレーションを使用して安全に Amazon Redshift に接続するには、URL で Okta のユーザー名とパスワードを使用します。

import sqlalchemy as sa
from sqlalchemy.engine.url import URL
from sqlalchemy import orm as sa_orm from sqlalchemy_redshift.dialect import TIMESTAMPTZ, TIMETZ # build the sqlalchemy URL. When authenticating using IAM, the host
# and port do not need to be specified by the user.
url = URL.create(
drivername='redshift+redshift_connector', # indicate redshift_connector driver and dialect will be used
database='dev', # Amazon Redshift database
username='johnd@example.com', # Okta username
password='<PWD>' # Okta password
) # a dictionary is used to store additional connection parameters
# that are specific to redshift_connector or cannot be URL encoded.
conn_params = { "iam": True, # must be enabled when authenticating via IAM "credentials_provider": "OktaCredentialsProvider", "idp_host": "<prefix>.okta.com", "app_id": "<appid>", "app_name": "amazon_aws_redshift", "region": "<region>", "cluster_identifier": "<clusterid>", "ssl_insecure": False, # ensures certificate verification occurs for idp_host
} engine = sa.create_engine(url, connect_args=conn_params)

Amazon Redshift データベースのユーザーとパスワードで接続する

データベースのユーザーとパスワードを使用して、Amazon Redshift クラスターに接続できます。 URL を作成し、 URL.create() 次のコードに示すように、コンストラクター:

import sqlalchemy as sa
from sqlalchemy.engine.url import URL # build the sqlalchemy URL
url = URL.create(
drivername='redshift+redshift_connector', # indicate redshift_connector driver and dialect will be used
host='<clusterid>.xxxxxx.<aws-region>.redshift.amazonaws.com', # Amazon Redshift host
port=5439, # Amazon Redshift port
database='dev', # Amazon Redshift database
username='awsuser', # Amazon Redshift username
password='<pwd>' # Amazon Redshift password
) engine = sa.create_engine(url) Next, we will create a session using the already established engine above. Session = sa_orm.sessionmaker()
Session.configure(bind=engine)
session = Session() # Define Session-based Metadata
metadata = sa.MetaData(bind=session.bind)

Amazon Redshift データ型を使用してデータベース テーブルを作成し、データを挿入する

新しい Amazon Redshift SQLAlchemy ダイアレクトを使用すると、SUPER、GEOMETRY、TIMESTAMPTZ、TIMETZ などの Amazon Redshift 固有のデータ型でテーブルを作成できます。

この手順では、TIMESTAMPTZ、TIMETZ、および SUPER データ型のテーブルを作成します。

必要に応じて、テーブルの分散スタイル、ソート キー、および圧縮エンコーディングを定義できます。 次のコードを参照してください。

import datetime
import uuid
import random table_name = 'product_clickstream_tz' RedshiftDBTable = sa.Table(
table_name,
metadata,
sa.Column('session_id', sa.VARCHAR(80)),
sa.Column('click_region', sa.VARCHAR(100), redshift_encode='lzo'),
sa.Column('product_id', sa.BIGINT),
sa.Column('click_datetime', TIMESTAMPTZ),
sa.Column('stream_time', TIMETZ),
sa.Column ('order_detail', SUPER),
redshift_diststyle='KEY',
redshift_distkey='session_id',
redshift_sortkey='click_datetime'
) # Drop the table if it already exists
if sa.inspect(engine).has_table(table_name):
RedshiftDBTable.drop(bind=engine) # Create the table (execute the "CREATE TABLE" SQL statement for "product_clickstream_tz")
RedshiftDBTable.create(bind=engine) In this step, you will populate the table by preparing the insert command. # create sample data set
# generate a UUID for this row
session_id = str(uuid.uuid1()) # create Region information
click_region = "US / New York" # create Product information
product_id = random.randint(1,100000) # create a datetime object with timezone
click_datetime = datetime.datetime(year=2021, month=10, day=20, hour=10, minute=12, second=40, tzinfo=datetime.timezone.utc) # create a time object with timezone
stream_time = datetime.time(hour=10, minute=14, second=56, tzinfo=datetime.timezone.utc) # create SUPER information
order_detail = '[{"o_orderstatus":"F","o_clerk":"Clerk#0000001991","o_lineitems":[{"l_returnflag":"R","l_tax":0.03,"l_quantity":4,"l_linestatus":"F"}]}]' # create the insert SQL statement
insert_data_row = RedshiftDBTable.insert().values(
session_id=session_id,
click_region=click_region,
product_id=product_id,
click_datetime=click_datetime,
stream_time=stream_time,
order_detail=order_detail
) # execute the insert SQL statement
session.execute(insert_data_row)
session.commit()

テーブルからクエリを実行して結果を取得する

SQLAlchemy ORM によって生成される SELECT ステートメントは、クエリ オブジェクトによって構築されます。 次のようないくつかの異なる方法を使用できます。 all(), first(), count(), order_by(), join(). 次のスクリーンショットは、クエリを実行したテーブルからすべての行を取得する方法を示しています。

Amazon Redshift SQLAlchemy 方言で IPython SqlMagic を使用する

Amazon Redshift SQLAlchemy ダイアレクトがサポートされるようになりました SqlMagic. 接続を確立するには、SQLAlchemy URL を redshift_connector 運転者。 についての詳細情報 SqlMagic で提供されています GitHubの.

次のセクションでは、使用方法を示します。 SqlMagic。を持っていることを確認してください。 ipython-sql パッケージがインストールされました。 そうでない場合は、次のコマンドを実行してインストールします。

pip install ipython-sql

Amazon Redshift に接続してデータをクエリする

このステップでは、SQLAlchemy URL を作成して Amazon Redshift に接続し、サンプル SQL クエリを実行します。 このデモでは、クラスター内の TPCH データを事前に取り込みました。 GitHubの。 次のコードを参照してください。

import sqlalchemy as sa
from sqlalchemy.engine.url import URL
from sqlalchemy.orm import Session
%reload_ext sql
%config SqlMagic.displaylimit = 25 connect_to_db = URL.create(
drivername='redshift+redshift_connector', host='cluster.xxxxxxxx.region.redshift.amazonaws.com', port=5439, database='dev', username='awsuser', password='xxxxxx' )
%sql $connect_to_db
%sql select current_user, version();

を使用して、データを表形式で表示できます。 pandas.DataFrame() 方法。

matplotlib をインストールした場合は、結果セットの .plot(), .pie(), .bar() クイック プロットのメソッド。

クリーンアップ

SQLAlchemy のリソースを使い終わったら、それらのリソースが閉じられ、クリーンアップされていることを確認してください。 SQLAlchemy は接続プールを使用して、Amazon Redshift クラスターへのアクセスを提供します。 開くと、既定の動作では、これらの接続は開いたままになります。 適切にクリーンアップしないと、クラスターで接続の問題が発生する可能性があります。 次のコードを使用して、リソースをクリーンアップします。

session.close() # If the connection was accessed directly, ensure it is invalidated
conn = engine.connect()
conn.invalidate() # Clean up the engine
engine.dispose()

まとめ

この投稿では、新しい Amazon Redshift SQLAlchemy ダイアレクトについて説明しました。 SSO を使用して Amazon Redshift データベースに安全に接続する方法と、SQLAlchemy URL を使用して直接接続する方法を示しました。 また、SQLAlchemy が TIMESTAMPTZ、TIMETZ、および SUPER データ型を明示的にキャストせずにサポートする方法も示しました。 方法も紹介しました redshift_connector そして方言サポート SqlMagic Jupyter ノートブックを使用すると、Amazon Redshift に対してインタラクティブなクエリを実行できます。


著者について

スメットジョシ は、ニューヨークを拠点とする分析スペシャリスト ソリューション アーキテクトです。 彼は、大規模なデータ ウェアハウジング ソリューションの構築を専門としています。 彼は、データ ウェアハウジングと分析分野で 16 年以上の経験があります。

ブルック・ホワイト AWS のソフトウェア開発エンジニアです。 彼女は、Amazon Redshift ドライバーに関する作業を通じて、顧客がデータを最大限に活用できるようにしています。 AWS の前は、サンフランシスコのベイエリアのスタートアップで ETL パイプラインと分析 API を構築しました。

ソース: https://aws.amazon.com/blogs/big-data/use-the-amazon-redshift-sqlalchemy-dialect-to-interact-with-amazon-redshift/

タイムスタンプ:

より多くの AWS