GretelとApacheAirflowを使用して合成データパイプラインを構築する

ソースノード: 1068200

GretelとApacheAirflowを使用して合成データパイプラインを構築する

このブログ投稿では、GretelのSynthetic DataAPIとApacheAirflowを使用してPostgreSQLデータベースから合成データを生成するETLパイプラインを構築します。


By ニューベリーを描いた、Gretel.aiのソフトウェアエンジニア

GretelとApacheAirflowを使用して合成データパイプラインを構築する

みなさん、私の名前はドリューです。私はここグレーテルのソフトウェアエンジニアです。 私は最近、Gretel APIを既存のツールに統合して、セキュリティと顧客のプライバシーが単なる後付けやチェックボックスではなく、一流の機能であるデータパイプラインを簡単に構築できるようにするためのパターンについて考えています。

Gretelのエンジニアと顧客の間で人気のあるデータエンジニアリングツールのXNUMXつは、ApacheAirflowです。 また、Gretelでもうまく機能します。 このブログ投稿では、Airflow、Gretel、PostgreSQLを使用して合成データパイプラインを構築する方法を紹介します。 飛び込みましょう!

エアフローとは

 
 
エアフロー は、データパイプラインの構築に一般的に使用されるワークフロー自動化ツールです。 これにより、データエンジニアまたはデータサイエンティストは、Pythonやその他の使い慣れた構造を使用して、これらのパイプラインをプログラムで定義およびデプロイできます。 Airflowの中核となるのは、DAG、つまり有向非巡回グラフの概念です。 Airflow DAGは、パイプラインコンポーネント、それらの依存関係、および実行順序を定義するためのモデルとAPIのセットを提供します。

製品データベースからデータウェアハウスにデータを複製するAirflowパイプラインが見つかる場合があります。 他のパイプラインは、正規化されたデータを分析またはモデリングに適した単一のデータセットに結合するクエリを実行する場合があります。 さらに別のパイプラインは、主要なビジネス指標を集約した日次レポートを公開する場合があります。 これらのユースケースで共有される共通のテーマは、システム間でのデータの移動を調整することです。 これがAirflowの魅力です。

エアフローとその豊かなエコシステムを活用する 統合、データエンジニアと科学者は、さまざまなツールやサービスを、保守と運用が容易な単一の統合パイプラインに統合できます。 これらの統合機能を理解した上で、GretelをAirflowパイプラインに統合して、一般的なデータ操作ワークフローを改善する方法について説明します。

グレーテルはどのように適合しますか?

 
 
グレーテルの使命は、データをより簡単かつ安全に操作できるようにすることです。 顧客と話すとき、私たちがよく耳にする問題のXNUMXつは、データサイエンティストが機密データにアクセスできるようにするために必要な時間と労力です。 使用する グレーテルシンセティックス、データセットの合成コピーを生成することで、機密データを扱うリスクを減らすことができます。 GretelをAirflowと統合することで、データサイエンティストが新しいデータ要求ごとにデータエンジニアを必要とせずに、必要なデータをすばやく取得できるセルフサービスパイプラインを作成できます。

これらの機能を実証するために、データベースからユーザーアクティビティ機能を抽出し、データセットの合成バージョンを生成し、データセットをS3に保存するETLパイプラインを構築します。 S3に保存された合成データセットを使用すると、データサイエンティストは、顧客のプライバシーを損なうことなく、ダウンストリームのモデリングや分析に使用できます。

物事を開始するために、最初にパイプラインの鳥瞰図を見てみましょう。 この図の各ノードは、パイプラインステップ、つまりエアフロー用語での「タスク」を表しています。



AirflowのGretel合成パイプラインの例。

 

ETLパイプラインに見られるものと同様に、パイプラインを3つの段階に分割できます。

  • エキス – extract_featuresタスクはデータベースにクエリを実行し、データをデータサイエンティストがモデルを構築するために使用できる一連の機能に変換します。
  • 合成する – generate_synthetic_featuresは、抽出された特徴を入力として受け取り、合成モデルをトレーニングしてから、GretelAPIとクラウドサービスを使用して特徴の合成セットを生成します。
  • 負荷 – upload_synthetic_featuresは、機能の合成セットをS3に保存し、そこでダウンストリームモデルまたは分析に取り込むことができます。

次のいくつかのセクションでは、これらXNUMXつのステップのそれぞれについて詳しく説明します。 各コードサンプルをフォローしたい場合は、次のURLにアクセスしてください。 gretelai / gretel-airflow-pipelines このブログ投稿で使用されているすべてのコードをダウンロードしてください。 リポジトリには、Airflowインスタンスを開始し、パイプラインをエンドツーエンドで実行するために従うことができる手順も含まれています。

さらに、各コンポーネントを分析する前に、Airflowパイプライン全体を確認すると役立つ場合があります。 dags / airbnb_user_bookings.py。 次のセクションのコードスニペットは、リンクされたユーザー予約パイプラインから抽出されます。

特徴の抽出

 
 
最初のタスクであるextract_featuresは、ソースデータベースから生データを抽出し、それを一連の機能に変換する役割を果たします。 これは一般的です 特徴エンジニアリング 機械学習または分析パイプラインで見つかる可能性のある問題。

このパイプラインの例では、PostgreSQLデータベースをプロビジョニングし、 AirbnbKaggleコンペティション.

このデータセットには、UsersとSessionsのXNUMXつのテーブルが含まれています。 セッションには、外部キー参照user_idが含まれています。 この関係を使用して、ユーザーごとに集計されたさまざまな予約指標を含む一連の機能を作成します。 次の図は、機能の構築に使用されるSQLクエリを表しています。

WITH session_features_by_user AS(SELECT user_id、count(*)AS number_of_actions_taken、count(DISTINCT action_type)AS number_of_unique_actions、round(avg(secs_elapsed))AS avg_session_time_seconds、round(max(secs_elapsed))AS max_session_time_seconds、 min_session_time_seconds、(SELECT count(*)FROM session s WHERE s.user_id = user_id AND s.action_type = 'booking_request')AS total_bookings FROM session GROUP BY user_id)SELECT u.id AS user_id、u.gender、u.age、u .language、u.signup_method、u.date_account_created、s.number_of_actions_taken、s.number_of_unique_actions、s.avg_session_time_seconds、s.min_session_time_seconds、s.max_session_time_seconds FROM session_features_by_user s LEFT JOIN


次に、SQLクエリがAirflowパイプラインから実行され、次のタスク定義を使用して中間のS3ロケーションに書き込まれます。

@task()def extract_features(sql_file:str)-> str:context = get_current_context()sql_query = Path(sql_file).read_text()key = f "{context ['dag_run']。run_id} _booking_features.csv" with NamedTemporaryFile (mode = "r +"、suffix = "。csv")as tmp_csv:postgres.copy_expert(f "copy({sql_query})to stdout with csv header"、tmp_csv.name)s3.load_file(filename = tmp_csv.name、 key = key、)return key


タスクへの入力sql_fileは、データベースで実行するクエリを決定します。 このクエリはタスクに読み込まれ、データベースに対して実行されます。 次に、クエリの結果がS3に書き込まれ、リモートファイルキーがタスクの出力として返されます。

以下のスクリーンショットは、上からの抽出クエリのサンプル結果セットを示しています。 次のセクションでは、このデータセットの合成バージョンを作成する方法について説明します。



クエリ結果のプレビュー。

GretelAPIを使用して機能を合成する

 
 
各機能の合成バージョンを生成するには、最初に合成モデルをトレーニングしてから、モデルを実行して合成レコードを生成する必要があります。 Gretelには、Airflowタスクへの統合を容易にするPythonSDKのセットがあります。

PythonクライアントSDKに加えて、 グレーテルエアフローフック GretelAPI接続とシークレットを管理します。 Gretel Airflow接続を設定した後、GretelAPIへの接続は次のように簡単です。

hooks.gretelからインポートGretelHookgretel = GretelHook()project = gretel.get_project()


Airflow接続を構成する方法の詳細については、Githubリポジトリを参照してください。 README.

上記の例のプロジェクト変数は、GretelのAPIを使用して合成モデルをトレーニングおよび実行するためのメインエントリポイントとして使用できます。 詳細については、 PythonAPIドキュメント.

予約パイプラインに戻って、generate_synthetic_featuresタスクを確認します。 このステップでは、前のタスクで抽出された特徴を使用して合成モデルをトレーニングします。

@task()def generate_synthetic_features(data_source:str)-> str:project = gretel.get_project()model = project.create_model_obj(model_config = "synthetics / default"、data_source = s3.download_file(data_source))model.submit_cloud() poll(model)return model.get_artifact_link( "data_preview")


メソッドシグネチャを見ると、パスdata_sourceをとっていることがわかります。 この値は、前のステップで抽出されたS3特徴を指します。 後のセクションでは、これらすべての入力と出力がどのように相互に配線されるかについて説明します。

project.create_model_objを使用してモデルを作成する場合、model_config paramは、モデルの生成に使用される合成モデル構成を表します。 このパイプラインでは、 デフォルトのモデル構成、しかし他の多く 設定オプション ご利用いただけます。

モデルが構成された後、model.submit_cloud()を呼び出します。 これにより、GretelCloudを使用したトレーニングとレコード生成のモデルが送信されます。 poll(model)を呼び出すと、モデルがトレーニングを完了するまでタスクがブロックされます。

モデルがトレーニングされたので、get_artifact_linkを使用して、生成された合成機能をダウンロードするためのリンクを返します。



機能の合成セットのデータプレビュー。

 

このアーティファクトリンクは、upload_synthetic_featuresの最終ステップへの入力として使用されます。

合成機能をロードする

 
 
元の特徴が抽出され、合成バージョンが作成されました。 次に、合成機能をアップロードして、ダウンストリームのコンシューマーがアクセスできるようにします。 この例では、データセットの最終的な宛先としてS3バケットを使用します。

@task()def upload_synthetic_features(data_set:str):context = get_current_context()with open(data_set、 "rb")as synth_features:s3.load_file_obj(file_obj = synth_features、key = f "{..._ booking_features_synthetic.csv"、 )


このタスクは非常に簡単です。 data_set入力値には、GretelのAPIから合成データセットをダウンロードするための署名付きHTTPリンクが含まれています。 タスクはそのファイルをAirflowワーカーに読み込み、設定済みのS3フックを使用して、ダウンストリームのコンシューマーまたはモデルがアクセスできるS3バケットに合成機能ファイルをアップロードします。

パイプラインの調整

 
 
最後のXNUMXつのセクションでは、データセットの抽出、合成、読み込みに必要なすべてのコードについて説明しました。 最後のステップは、これらの各タスクをXNUMXつのAirflowパイプラインにまとめることです。

この投稿の冒頭に戻って思い出すと、DAGの概念について簡単に説明しました。 AirflowのTaskFlowAPIを使用して、これらXNUMXつのPythonメソッドをDAGに構成し、各ステップが実行される入力、出力、および順序を定義できます。

feature_path = extract_features( "/opt/airflow/dags/sql/session_rollups__by_user.sql")synthetic_data = generate_synthetic_features(feature_path)upload_synthetic_features(synthetic_data)


これらのメソッド呼び出しのパスをたどると、最終的には元の機能パイプラインのようなグラフが得られます。



AirflowのGretel合成パイプライン。

 

このパイプラインを実行して、実際の動作を確認したい場合は、 付随するGithubリポジトリ。 ここには、Airflowインスタンスを開始し、パイプラインをエンドツーエンドで実行する方法の説明があります。

ラッピング

 
 
ここまで進んだら、GretelをAirflow上に構築されたデータパイプラインに統合する方法を見てきました。 Gretelの開発者向けAPIと、Airflowの強力なフックとオペレーターのシステムを組み合わせることで、データをよりアクセスしやすく安全に使用できるETLパイプラインを簡単に構築できます。

また、機密データに簡単にアクセスできない可能性がある一般的な機能エンジニアリングのユースケースについても説明しました。 データセットの合成バージョンを生成することで、機密データが公開されるリスクを軽減しながら、データセットの有用性を維持しながら、必要な人がすぐに利用できるようにします。

機能パイプラインをより抽象的な用語で考えると、新しいSQLクエリをいくつでも再利用できるパターンができました。 パイプラインの新しいバージョンをデプロイし、最初のSQLクエリを交換することで、機密性の高いクエリの前に、顧客のプライバシーを保護する合成データセットを使用できます。 変更する必要があるコードの行は、SQLファイルへのパスだけです。 複雑なデータエンジニアリングは必要ありません。

読んでくれてありがとう

 
 
にメールを送信 こんにちは@gretel.ai または参加してください Slack  ご質問やご意見がございましたら。 Airflowの使用方法と、既存のデータパイプラインとの最適な統合方法についてお聞かせください。

 
バイオ: ニューベリーを描いた Gretel.aiのソフトウェアエンジニアです。

元の。 許可を得て転載。

関連する

ソース:https://www.kdnuggets.com/2021/09/build-synthetic-data-pipeline-gretel-apache-airflow.html

タイムスタンプ:

より多くの KDナゲット