オープンソース JDBC コネクタを使用した Amazon MSK Connect でデータをストリーミングする | アマゾン ウェブ サービス

オープンソース JDBC コネクタを使用した Amazon MSK Connect でデータをストリーミングする | アマゾン ウェブ サービス

ソースノード: 2097979

お客様が採用している Apache Kafka 用の Amazon マネージドサービス (Amazon MSK) を、エンタープライズ データ ハブを構築するための高速で信頼性の高いストリーミング プラットフォームとして利用します。 ストリーミング機能に加えて、Amazon MSK を設定すると、組織は疎結合で独立したコンポーネントによるデータ配布にパブリッシュ/サブスクモデルを使用できるようになります。

Apache Kafka クラスターと、検索インデックス、データベース、ファイル システムなどの他の外部システムの間でデータを公開および配布するには、以下をセットアップする必要があります。 Apache Kafka コネクトのオープンソース コンポーネントです。 アパッチカフカ フレームワークを使用して、さまざまなシステム間でデータを移動するためのコネクタをホストおよび実行します。 アップストリームおよびダウンストリームのアプリケーションの数が増加するにつれて、Apache Kafka Connect クラスターの管理、拡張、管理も複雑になります。 これらのスケーラビリティと管理性の問題に対処するには、 アマゾンMSKコネクト は、Apache Kafka Connect 用に構築されたフルマネージド コネクタをデプロイする機能を提供します。また、ワークロードの変化に合わせて自動的にスケーリングし、消費したリソースに対してのみ支払いを行う機能を備えています。

この投稿では、からデータをストリーミングするソリューションについて説明します。 MySQL 用 Amazon Relation Database Service (Amazon RDS) Amazon MSK Connect を使用してコネクタを設定およびデプロイすることで、リアルタイムで MSK クラスターに接続できます。

ソリューションの概要

私たちのユースケースでは、企業は複数のプロデューサー アプリケーションとコンシューマー アプリケーションを含む集中データ リポジトリを構築したいと考えています。 さまざまなツールやテクノロジーを使用してアプリケーションからのストリーミング データをサポートするために、ストリーミング プラットフォームとして Amazon MSK が選択されています。 現在、Amazon RDS for MySQL にデータを書き込んでいる主要なアプリケーションの XNUMX つでは、データを MSK トピックに公開し、同時にデータベースに書き込むために、大幅な設計変更が必要になります。 したがって、設計の変更を最小限に抑えるために、このアプリケーションは引き続き Amazon RDS for MySQL にデータを書き込みますが、このデータを集中ストリーミング プラットフォーム Amazon MSK と同期して、複数の下流消費者に対するリアルタイム分析を可能にするという追加要件も伴います。

このユースケースを解決するために、Amazon MSK の機能である Amazon MSK Connect を使用して、オープンソースの Amazon RDS for MySQL から MSK クラスターにデータを移動するためのフルマネージド Apache Kafka Connect コネクタをセットアップする次のアーキテクチャを提案します。 JDBC コネクタから ジャンクショ​​ン.

AWS環境をセットアップする

このソリューションをセットアップするには、いくつかの AWS リソースを作成する必要があります。 の AWS CloudFormation この投稿で提供されているテンプレートは、前提条件として必要なすべての AWS リソースを作成します。

次の表に、テンプレートに指定する必要があるパラメータを示します。

パラメータ名 Description キープ デフォルト値
スタック名 CloudFormation スタックの名前。 いいえ
DBインスタンスID RDS for MySQL インスタンスの名前。 いいえ
DB名 ストリーミング用のサンプル データを保存するデータベース名。 はい
DBインスタンスクラス RDS for MySQL インスタンスのインスタンス タイプ。 いいえ
DB割り当てストレージ DB インスタンスに割り当てられたサイズ (GiB)。 いいえ
DBUユーザー名 MySQL データベースにアクセスするためのデータベース ユーザー。 いいえ
DBパスワード MySQL データベースにアクセスするためのパスワード。 いいえ
JDBCConnectorPluginBukcetName MSK Connect コネクタの JAR ファイルとプラグインを保存するためのバケット。 いいえ
クライアントIPCIDR EC2 インスタンスに接続するクライアント マシンの IP アドレス。 いいえ
EC2KeyPair EC2 インスタンスで使用されるキーペア。 この EC2 インスタンスは、ローカル マシンから EC2 クライアント インスタンスに接続するためのプロキシとして使用されます。 いいえ
EC2ClientImageId Amazon Linux 2 の最新の AMI ID。この投稿ではデフォルト値をそのまま使用できます。 はい
VpcCIDR この VPC の IP 範囲 (CIDR 表記)。 いいえ
PrivateSubnetOneCIDR 最初のアベイラビリティーゾーンのプライベートサブネットの IP 範囲 (CIDR 表記)。 いいえ
PrivateSubnetTwoCIDR XNUMX 番目のアベイラビリティーゾーンのプライベートサブネットの IP 範囲 (CIDR 表記)。 いいえ
PrivateSubnetThreeCIDR XNUMX 番目のアベイラビリティーゾーンのプライベートサブネットの IP 範囲 (CIDR 表記)。 いいえ
パブリックサブネットCIDR パブリックサブネットのIP範囲(CIDR表記)。 いいえ

CloudFormation スタックを起動するには、次を選択します。 発射スタック:

CloudFormation テンプレートが完成し、リソースが作成されると、 出力 タブにはリソースの詳細が表示されます。

RDS for MySQL インスタンスのサンプル データを検証する

この使用例用のサンプル データを準備するには、次の手順を実行します。

  1. ローカル端末から次のコマンドを使用して、EC2 クライアント インスタンス MSKEC2Client に SSH 接続します。
    ssh -i <keypair> <user>@<hostname>

  2. 次のコマンドを実行して、データが正常にロードされたことを検証します。
    $ mysql -h <rds_instance_endpoint_name> -u <user_name> -p MySQL [(none)]> use dms_sample; MySQL [dms_sample]> select mlb_id, mlb_name, mlb_pos, mlb_team_long, bats, throws from mlb_data limit 5;

すべてのテーブルのデータを Amazon RDS から Amazon MSK に同期します

Amazon RDS から Amazon MSK にすべてのテーブルを同期するには、次の手順で Amazon MSK Connect マネージドコネクタを作成します。

  1. Amazon MSKコンソールで、 カスタムプラグイン 下のナビゲーションペインで MSKコネクト.
  2. 選択する カスタムプラグインを作成する.
  3. S3 URI – カスタム プラグイン オブジェクト、S3 バケット内の JDBC コネクタの confluentinc-kafka-connect-jdbc-plugin.zip (CloudFormation テンプレートによって作成された) という名前の ZIP ファイルを参照します。 bkt-msk-connect-plugins-<aws_account_id>.
  4. カスタムプラグイン名、 入る msk-confluent-jdbc-plugin-v1.
  5. オプションの説明を入力します。
  6. 選択する カスタムプラグインを作成する.

カスタム プラグインが正常に作成されると、次の場所で使用できるようになります。 アクティブ status

  1. 選択する コネクタ 下のナビゲーションペインで MSKコネクト.
  2. 選択する コネクタを作成する.
  3. 選択 既存のカスタム プラグインを使用 と下 カスタムプラグイン、プラグインを選択します msk-confluent-jdbc-plugin-v1 以前に作成したもの。
  4. 選択する Next.
  5. コネクタ名、 入る msk-jdbc-connector-rds-to-msk.
  6. オプションの説明を入力します。
  7. クラスタータイプ選択 MSK クラスター.
  8. MSKクラスターで、前に作成したクラスターを選択します。
  9. 認証、選択する わたし。
  10. コネクタ構成、次の設定を入力します。
    ### CONNECTOR SPECIFIC SETTINGS ### Provide the configuration properties to connect to source and destination endpoints including authentication ### mechanism, credentials and task details such as polling interval, source and destination object names, data ### transfer mode, parallelism ### Many of these properties are connector and end-point specific, so please review the connector documentation ### for more details connector.class=io.confluent.connect.jdbc.JdbcSourceConnector connection.user=admin connection.url=jdbc:mysql://<rds_instance_endpoint_name>:3306/dms_sample connection.password=XXXXX tasks.max=1 poll.interval.ms=300000 topic.prefix=rds-to-msk- mode=bulk connection.attempts=1 ### CONVERTING KAFKA MESSAGE BYTES TO JSON value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter value.converter.schemas.enable=false key.converter.schemas.enable=false ###GENERIC AUTHENTICATION SETTINGS FOR KAFKA CONNECT security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM ssl.truststore.location=~/kafka.truststore.jks ssl.keystore.location=~/kafka.client.keystore.jks sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

次の表は、前述のすべての構成オプションの簡単な概要を示しています。

設定オプション Description
コネクタ.クラス コネクタの JAVA クラス
接続.ユーザー MySQL エンドポイントで認証するためのユーザー名
接続.url MySQL エンドポイントのホスト名とポート番号を識別する JDBC URL
接続.パスワード MySQL エンドポイントで認証するためのパスワード
タスク.最大 このコネクタに対して起動されるタスクの最大数
ポーリング間隔.ms 各テーブルが新しいデータを取得するための後続のポーリング間の時間間隔 (ミリ秒)
トピック.プレフィックス MSK クラスターでトピックを作成するときに各テーブル名に追加するカスタム プレフィックス値
モード 各ポーリングの動作モード (バルク、タイムスタンプ、増分、またはタイムスタンプ+増分など)
接続の試行 JDBC接続の最大リトライ回数
セキュリティ.プロトコル 暗号化用に TLS を設定します
sasl.メカニズム 使用する SASL メカニズムを識別します
ssl.truststore.location 信頼できる証明書を保存する場所
ssl.keystore.location 秘密鍵を保管する場所
sasl.client.callback.handler.class 抽出された資格情報に基づいて SigV4 署名の構築をカプセル化します。
sasl.jaas.config SASL クライアント実装をバインドします。

  1. コネクタ容量 セクション、選択 自動スケール for 容量タイプ デフォルト値の 1 をそのまま使用します。 ワーカーあたりの MCU 数.
  2. セット4 最大ワーカー数 他のすべてのデフォルト値を維持します 労働者 および 自動スケーリング使用率のしきい値.
  3. ワーカー構成選択 MSK のデフォルト構成を使用する.
  4. アクセス許可、カスタム IAM ロールを選択します msk-connect-rds-jdbc-MSKConnectServiceIAMRole-* 以前に作成されました。
  5. ログ配信選択 Amazon CloudWatch Logs に配信する.
  6. ロググループ、ロググループを選択します msk-jdbc-source-connector 以前に作成されました。
  7. 選択する Next.
  8. レビューと作成、すべての設定を検証し、選択します コネクタを作成する.

コネクタが移行した後、 ランニング ステータスが変化すると、RDS インスタンスから MSK クラスターへのデータのフローが開始されるはずです。

データを検証する

データを検証して比較するには、次の手順を実行します。

  1. EC2 クライアント インスタンスへの SSH 接続 MSKEC2Client ローカル端末から次のコマンドを使用します。
    ssh -i <keypair> <user>@<hostname>

  2. IAM 認証を使用して MSK クラスターに接続するには、最新バージョンの aws-msk-iam-認証 クラスパス内の JAR ファイル:
    $ export CLASSPATH=/home/ec2-user/aws-msk-iam-auth-1.1.0-all.jar

  3. Amazon MSKコンソールで、 クラスター ナビゲーションペインでクラスタを選択します MSKConnect-msk-connect-rds-jdbc.
  4. ソフトウェア設定ページで、下図のように クラスタの概要 ページ、選択 クライアント情報を見る.
  5. クライアント情報を見る セクション ブートストラップ サーバーのプライベート エンドポイントをコピーします。 認証タイプ わたし。

  1. 最新バージョンの Apache Kafka インストールを操作し、Amazon MSK ブートストラップ サーバーに接続するための追加の環境変数を設定します。 <bootstrap servers> IAM 認証を使用して MSK クラスターに接続できるブートストラップ サーバーのリストです。
    $ export PATH=~/kafka/bin:$PATH $ cp ~/aws-msk-iam-auth-1.1.0-all.jar ~/kafka/libs/. $ export BOOTSTRAP_SERVERS=<bootstrap servers>

  2. という名前の構成ファイルをセットアップします client/properties 認証に使用されます:
    $ cd /home/ec2-user/kafka/config/ $ vi client.properties # Sets up TLS for encryption and SASL for authN. security.protocol = SASL_SSL # Identifies the SASL mechanism to use. sasl.mechanism = AWS_MSK_IAM # Binds SASL client implementation. sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required; # Encapsulates constructing a SigV4 signature based on extracted credentials. # The SASL client bound by "sasl.jaas.config" invokes this class. sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

  3. MSK クラスターで作成されたトピックのリストを検証します。
    $ cd /home/ec2-user/kafka/ $ bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVERS --command-config /home/ec2-user/kafka/config/client.properties

  1. データが MSK クラスター内のトピックにロードされたことを検証します。
    $ bin/kafka-console-consumer.sh --topic rds-to-msk-seat --from-beginning --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config /home/ec2-user/kafka/config/client.properties

Amazon RDS へのクエリを使用してデータを同期し、Amazon MSK に書き込む

Amazon RDS for MySQL の複数のテーブルを結合してデータを平坦化するクエリの結果を同期するには、次の手順で Amazon MSK Connect マネージドコネクタを作成します。

  1. Amazon MSK コンソールで、選択します コネクタ 下のナビゲーションペインで MSKコネクト.
  2. 選択する コネクタを作成する.
  3. 選択 既存のカスタム プラグインを使用 と下 カスタムプラグイン、選択 pluginmsk-confluent-jdbc-plugin-v1.
  4. コネクタ名、 入る msk-jdbc-connector-rds-to-msk-query.
  5. オプションの説明を入力します。
  6. クラスタータイプ選択 MSK クラスター.
  7. MSKクラスターで、前に作成したクラスターを選択します。
  8. 認証、選択する わたし。
  9. コネクタ構成、次の設定を入力します。
    ### CONNECTOR SPECIFIC SETTINGS connector.class=io.confluent.connect.jdbc.JdbcSourceConnector connection.user=admin connection.url=jdbc:mysql://<rds_instance_endpoint_name>:3306/dms_sample connection.password=XXXXX tasks.max=1 poll.interval.ms=300000 topic.prefix=rds-to-msk-query-topic mode=bulk connection.attempts=1 query=select last_name, name as team_name, sport_type_name, sport_league_short_name, sport_division_short_name from dms_sample.sport_team join dms_sample.player on player.sport_team_id = sport_team.id; ### CONVERTING KAFKA MESSAGE BYTES TO JSON value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter value.converter.schemas.enable=false key.converter.schemas.enable=false ###GENERIC AUTHENTICATION SETTINGS FOR KAFKA CONNECT security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM ssl.truststore.location=~/kafka.truststore.jks ssl.keystore.location=~/kafka.client.keystore.jks sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

  10. コネクタ容量 セクション、選択 自動スケール for 容量タイプ デフォルト値の 1 をそのまま使用します。 ワーカーあたりの MCU 数.
  11. セット4 最大ワーカー数 他のすべてのデフォルト値を維持します 労働者 および 自動スケーリング使用率のしきい値.
  12. ワーカー構成選択 MSK のデフォルト構成を使用する.
  13. アクセス許可、カスタム IAM ロールを選択します role_msk_connect_serivce_exec_custom.
  14. ログ配信選択 Amazon CloudWatch Logs に配信する.
  15. ロググループ、前に作成したログ グループを選択します。
  16. 選択する Next.
  17. レビューと作成、すべての設定を検証し、選択します コネクタを作成する.

コネクタが移行すると、 ランニング ステータスが変化すると、RDS インスタンスから MSK クラスターへのデータのフローが開始されるはずです。

  1. データを検証するには、EC2 クライアント インスタンス MSKEC2Client に SSH で接続し、次のコマンドを実行してトピック内のデータを確認します。
    $ bin/kafka-console-consumer.sh --topic rds-to-msk-query-topic --from-beginning --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config /home/ec2-user/kafka/config/client.properties

クリーンアップ

リソースをクリーンアップして継続的な料金を回避するには、次の手順を実行します。

  1. Amazon MSKコンソールで、 コネクタ 下のナビゲーションペインで MSKコネクト.
  2. 作成したコネクタを選択し、 削除.
  3. Amazon S3コンソールで、 バケット ナビゲーションペインに表示されます。
  4. 命名規則に従ってバケットを検索する bkt-msk-connect-plugins-<aws_account_id>.
  5. このバケット内のすべてのフォルダーとオブジェクトを削除します。
  6. すべての内容を削除した後、バケットを削除します。
  7. CloudFormation スタックを使用して作成された他のすべてのリソースを削除するには、AWS CloudFormation コンソールからスタックを削除します。

まとめ

Amazon MSK Connect は、必要なリソースをプロビジョニングし、コネクタの健全性と配信状態を監視し、基盤となるハードウェアを維持し、コネクタを自動スケールしてワークロードのバランスを整えるフルマネージド型サービスです。 この投稿では、RDS for MySQL インスタンスと MSK クラスターの間でデータをストリーミングするために、Confluent からオープンソース JDBC コネクタをセットアップする方法を説明しました。 また、すべてのテーブルを同期するためのさまざまなオプションや、クエリベースのアプローチを使用して非正規化データを MSK トピックにストリーミングするためのさまざまなオプションも検討しました。

Amazon MSK Connect の詳細については、以下を参照してください。 MSK Connect の使用を開始する.


著者について

マニッシュ・ヴィルワニ AWS のシニア ソリューション アーキテクトです。 彼は、大規模なビッグデータおよび分析ソリューションの設計と実装に XNUMX 年以上の経験があります。 彼は、一部の主要な AWS 顧客およびパートナーに技術指導、設計アドバイス、思想的リーダーシップを提供しています。

インディラ・バラクリシュナン AWS アナリティクス スペシャリスト SA チームのプリンシパル ソリューション アーキテクトです。 彼女は、顧客がクラウドベースの分析ソリューションを構築して、データ主導の意思決定を使用してビジネス上の問題を解決できるよう支援することに情熱を注いでいます。 仕事以外では、子供たちの活動にボランティアとして参加し、家族との時間を過ごしています。

タイムスタンプ:

より多くの AWSビッグデータ