今日、あらゆる業種のあらゆる規模の企業が、リアルタイムストリーミングとストリーム処理を中心としたイベント駆動型アーキテクチャを設計および構築しています。 ApacheKafkaのAmazonマネージドストリーミング (Amazon MSK)は、以下を使用するアプリケーションの構築と実行を容易にするフルマネージドサービスです。 アパッチカフカ ストリーミングおよびイベントデータを処理します。 Apache Kafkaは、リアルタイムストリーミングデータパイプラインとアプリケーションを構築するためのオープンソースプラットフォームです。 Amazon MSKを使用すると、ネイティブのApache Kafka APIを引き続き使用して、イベント駆動型アーキテクチャを構築し、データベースとの間で変更をストリーミングし、機械学習および分析アプリケーションを強化できます。
IoTデバイスからのデータのキャプチャと分析、車両や貨物の追跡と監視、医療施設の患者の監視、金融取引の監視など、さまざまな業界や組織でストリーミングを適用できます。
この投稿では、AmazonMSKを使用してリアルタイムストリーム処理アプリケーションを構築する方法について説明します。 AWSファーゲート、およびApache Kafka StreamsAPI。 NS Kafka ストリーム API は、ストリームアプリケーションの開発を簡素化するクライアントライブラリです。 舞台裏では、KafkaStreamsライブラリは実際には標準を抽象化したものです カフカプロデューサー および KafkaコンシューマーAPI。 Kafka Streamsライブラリを使用してアプリケーションを構築すると、データストリームは自動的にフォールトトレラントになり、アプリケーションのインスタンス全体に透過的かつ弾力的に分散されます。 KafkaStreamsアプリケーションは AmazonMSKでサポート。 Fargateは、次のようなAWSコンテナオーケストレーションサービスと連携するコンテナ用のサーバーレスコンピューティングエンジンです。 Amazon エラスティック コンテナ サービス (Amazon ECS)。これにより、コンテナ化されたアプリケーションを簡単に実行、スケーリング、保護できます。
Fargateを使用すると、アプリケーションの構築に集中しやすくなるため、KafkaStreamsアプリケーションをFargateで実行することを選択しました。 Fargateを使用すると、サーバーをプロビジョニングおよび管理する必要がなくなり、アプリケーションごとにリソースを指定して支払うことができ、設計によるアプリケーションの分離によってセキュリティが向上します。 Fargateは適切な量のコンピューティングを割り当て、インスタンスを選択してクラスター容量をスケーリングする必要をなくします。 コンテナの実行に必要なリソースに対してのみ料金を支払うため、過剰なプロビジョニングや追加のサーバーの料金を支払う必要はありません。 Fargateは、各タスクまたはポッドを独自のカーネルで実行し、タスクとポッドに独自の分離されたコンピューティング環境を提供します。 これにより、アプリケーションでワークロードを分離し、設計によりセキュリティを向上させることができます。
アーキテクチャの概要
当社のストリーミングアプリケーションアーキテクチャは、Twitter Stream APIに接続し、ツイートを読み取り、AmazonMSKに公開するストリームプロデューサーで構成されています。 Kafka Streamsプロセッサは、これらのメッセージを消費し、ウィンドウの集約を実行し、トピックの結果にプッシュして、ログに出力します。 どちらのアプリもFargateでホストされています。
ストリームプロデューサーアプリケーションは、Twitter API(サンプルツイートのストリーム)に接続し、ツイートのストリームを読み取り、ハッシュタグのみを抽出して、MSKトピックに公開します。 以下は、アプリケーションからのコードスニペットです。
MSKクラスターは2.6.1つのアベイラビリティーゾーンに分散しており、アベイラビリティーゾーンごとにXNUMXつのブローカーがあります。 AWSが推奨する(この記事の執筆時点で)バージョンのApache KafkaXNUMXを使用します。 Apache Kafkaトピックには、並列処理と復元力を活用するために、レプリケーション係数とXNUMXつのパーティションがあります。
コンシューマーストリーミングアプリのロジックは次のとおりです。 これは、1秒のウィンドウで20回以上言及された、最小長がXNUMXのTwitterハッシュタグの数をカウントします。
前提条件
前提条件として、必ず次の手順を実行してください。
- AWSアカウントを作成します。 この投稿では、必要なAWSリソースを
us-east-1
orus-west-2
領域。 サインアップしていない場合は、次のタスクを実行します。- アカウントを作成する。 手順については、を参照してください。 AWSにサインアップ.
- 作る AWS IDおよびアクセス管理 (IAM)ユーザー。 手順については、を参照してください。 IAMユーザーを作成する.
- Twitterアプリにベアラートークンを関連付けます。 開発者アカウントを作成するには、を参照してください Twitter開発者プラットフォームを始めましょう.
- インストールを開始する デッカー ローカルマシンで。
ソリューションの概要
このソリューションを実装するには、次の手順を実行します。
- MSKクラスターをセットアップして Amazon エラスティック コンテナ レジストリ (Amazon ECR)。
- アプリケーションJARファイルをビルドしてAmazonECRにアップロードします。
- Fargateタスクとサービス定義を使用してECSクラスターを作成します。
- ストリーミングアプリケーションを実行します。
MSKクラスターとAmazonECRをセットアップします
提供されているものを使用してください AWS CloudFormation template VPC(他の必要なネットワークコンポーネントを含む)、セキュリティグループ、必要なKafkaトピックを含むMSKクラスターを作成する(twitter_input
および twitter_output
)、およびXNUMXつのAmazon ECRリポジトリ(アプリケーションごとにXNUMXつ)。
アプリケーションJARファイルをビルドしてAmazonECRにアップロードします
JARファイルをビルドしてAmazonECRにアップロードするには、次の手順を実行します。
- からアプリケーションコードをダウンロードします GitHubの レポ。
- プロジェクトのルートで次のコマンドを実行して、アプリケーションをビルドします。
- Dockerイメージを作成します(
kafka-streams-msk
およびtwitter-stream-producer
):
- 認証トークンを取得し、Dockerクライアントをレジストリに対して認証します。 以下を使用してください AWSコマンドラインインターフェイス (AWS CLI)コード:
- 画像にタグを付けてAmazonECRリポジトリにプッシュします。
- 次のコマンドを実行して、画像をAmazonECRリポジトリにプッシュします。
これで、Amazon ECRリポジトリに画像が表示されます(次のスクリーンショットを参照)。
Fargateタスクとサービス定義を使用してECSクラスターを作成します
提供されているCloudFormationを使用する template ECSクラスター、Fargateタスク、およびサービス定義を作成します。 TwitterAPIベアラートークンを用意してください。
ストリーミングアプリケーションを実行します
CloudFormationスタックが完了すると、アプリケーションが自動的にデプロイされます。 約10分後、すべてのアプリが稼働し、集約され、結果が生成されます。 あなたはで結果を見ることができます アマゾンクラウドウォッチ ログまたはに移動して ログ Fargateタスクのタブ。
改善、考慮事項、およびベストプラクティス
このソリューションを実装するときは、次のことを考慮してください。
- Fargateを使用すると、クラスター内でタスク定義の指定された数のインスタンスを同時に実行および維持できます。 何らかの理由でタスクのいずれかが失敗または停止した場合、Fargateスケジューラーは、サービス内のタスクの必要な数を維持するために、タスク定義の別のインスタンスを起動してそれを置き換えます。 Fargateは、特権Docker権限を必要とするワークロード、または4vCPUまたは30Gbを超えるメモリを必要とするワークロードには推奨されません(ワークロードを、それぞれがより少ないリソースを使用する、より多くのより小さなコンテナーに分割できるかどうかを検討してください)。
- KafkaStreamsの復元力と可用性はによって提供されます 州の店。 これらの状態ストアは、インメモリハッシュマップ(この投稿で使用)、または別の便利なデータ構造(たとえば、本番環境で推奨されるRocksDBデータベース)のいずれかです。 Kafka Streamsアプリケーションは、APIを介してアクセスできる複数のローカル状態ストアを埋め込んで、処理に必要なデータを保存および照会することができます。 さらに、Kafka Streamsは、ローカルの州のストアが障害に対して堅牢であることを確認します。 状態ストアごとに、状態の更新を追跡する複製された変更ログKafkaトピックを維持します。 クラッシュ後にアプリが再起動すると、changelog Kafkaトピックが再生され、メモリ内の状態ストアが再作成されます。
- AWSGlueスキーマレジストリ はこの投稿の範囲外ですが、登録済みのApache Avroスキーマを使用してストリーミングデータの進化を一元的に検出、検証、および制御するために検討する必要があります。 それに伴う利点のいくつかは、データポリシーの実施、データの発見、制御されたスキーマの進化、およびフォールトトレラントなストリーミング(データ)パイプラインです。
- 可用性を向上させるには、リージョン内でXNUMXつ(この記事の執筆時点で最大)のアベイラビリティーゾーンレプリケーションを有効にします。 Amazon MSKはクラスターの状態を継続的に監視し、コンポーネントに障害が発生した場合、AmazonMSKが自動的にそれを置き換えます。
- MSKクラスターでXNUMXつのアベイラビリティーゾーンを有効にすると、可用性が向上するだけでなく、クラスターのパフォーマンスも向上します。 多数のブローカー間で負荷を分散し、トピックごとにより多くのパーティションを追加できます。
- 保管時の暗号化、転送中のTLS暗号化(クライアントからブローカー、ブローカーからブローカー)を有効にすることを強くお勧めします。 TLSベースの証明書認証, SASL / SCRAM認証、によって保護することができます AWSシークレットマネージャー.
クリーンアップ
リソースをクリーンアップするには、この投稿の一部として起動したCloudFormationスタックを削除します。 これらのリソースは、AWSCloudFormationコンソールまたは AWSコマンドラインインターフェイス (AWS CLI)。
まとめ
この投稿では、スケーラブルで復元力のあるリアルタイムストリーム処理アプリケーションを構築する方法を示しました。 Kafka Streams API、Amazon MSK、およびFargateを使用してソリューションを構築します。 また、改善、考慮事項、およびベストプラクティスについても説明しました。 このアーキテクチャは、移行または新しいワークロードの参照として使用できます。 それを試してみて、コメントであなたの経験を共有してください!
著者について
- &
- 11
- 9
- アクセス
- NEW
- 利点
- すべて
- Amazon
- 分析論
- アパッチ
- アパッチカフカ
- API
- API
- アプリ
- 申し込み
- アプリ
- 建築
- 周りに
- 認証
- 賃貸条件の詳細・契約費用のお見積り等について
- AWS
- 舞台裏で
- BEST
- ベストプラクティス
- ブローカー
- ブローカー
- ビルド
- 建物
- 容量
- 証明書
- コード
- 企業
- コンポーネント
- 計算
- consumer
- コンテナ
- コンテナ
- 続ける
- Crash
- データ
- データベース
- データベースを追加しました
- 設計
- Developer
- 開発
- Devices
- 発見
- デッカー
- 暗号化
- 環境
- イベント
- 進化
- 体験
- 抽出物
- ファイナンシャル
- フォーカス
- ハッシュ
- 健康
- 認定条件
- How To
- HTTPS
- IAM
- アイデンティティ
- 産業
- IOT
- IoTデバイス
- 分離
- IT
- キー
- 最新の
- 起動
- 主要な
- 学習
- 図書館
- LINE
- 負荷
- ローカル
- 長い
- 機械学習
- 地図
- 医療の
- ネットワーク
- 注文
- その他
- 患者
- 支払う
- パフォーマンス
- プラットフォーム
- ポッド
- 方針
- 電力
- プライベート
- プロデューサー
- 生産
- プロジェクト
- 範囲
- への
- リソース
- REST
- 結果
- ラン
- ランニング
- 規模
- セキュリティ
- サーバレス
- サービス
- セッションに
- シェアする
- So
- 広がる
- 開始
- 都道府県
- 店舗
- 店舗
- ストリーミング
- TLS
- トークン
- トピック
- 追跡する
- 取引
- トランジット
- さえずり
- 更新版
- 車
- 以内
- 作品
- 書き込み