原文はこちら。
https://community.oracle.com/community/cloud_computing/oracle-cloud-developer-solutions/blog/2017/01/05/microservices-messaging-on-oracle-cloud-using-apache-kafka
この記事は2部構成の第1部です。Oracle Cloud Platformと幅広く使われているオープンソーステクノロジーを組み合わせ、サンプルアプリケーション(ソースコードは
こちら)を使って、マイクロサービス間を疎結合の非同期メッセージベースで連携する方法を紹介します。
Cloud Developer Portal Solutions
https://cloud.oracle.com/developer/solutions
第1部では、以下の内容を取り扱います。
- 個々のマイクロサービスの開発
- 非同期メッセージングを利用した疎結合連携
- Oracle Cloud Serviceのセットアップおよびデプロイ
第2部ではマイクロサービスのスケールについて取り扱います。
利用するコンポーネント
Oracle Cloud
以下のOracle Cloud Serviceを使います。
オープンソース
以下のオープンソーステクノロジーを使ってサンプルアプリケーションを構築します。
Microservicesにおけるメッセージング
マイクロサービスベースのシステムは複数のアプリケーション(サービス)から構成されており、これらのサービスは、通常システム全体の特殊な側面(ビジネスシナリオ)にフォーカスしています。個々のサービスは、相互に連携せず独立して機能することもできますが、実際のところサービスは孤立して機能することはできず、お互いに通信して仕事を終わらせる必要があります。マイクロサービス間の通信を実装する上で複数の戦略を使うことができ、同期 vs 非同期スタイル、コレオグラフィ vs オーケストレーション、REST(HTTP)vs メッセージングといったくくりで分類されることがよくあります。
サンプルアプリケーションのアーキテクチャ
この記事のサンプルアプリケーションのユースケースはシンプルで、ランダムに生成されたデータ(Producerマイクロサービス)を別のエンティティ(Consumerマイクロサービス)が受け取り、最終的にはリアルタイムでユーザーが確認できるようにします。
第1部では、高可用性の設定を考慮しておらず、1個のKafkaノードです。つまり、Kafkaクラスタに1個のサーバのみ存在し、ProducerとConsumerマイクロサービス(両方とも1インスタンスのみ)をApplication Container Cloudにデプロイします。
上図で図示されている個々のコンポーネントを見ていきましょう。
Apache Kafka
Apache Kafkaは、「メッセージングシステムまたは分散コミットログとして実装されたストリーミングプラットフォーム」として広く知られています。もっと簡単な説明があればいいのですが。
- 基本
KafkaはJVM上で動作するScalaで記述された、Publish-Subscribeベースのメッセージングシステムで、パブリッシャ(Publisher)がトピック(Topic)にデータを書き込み、コンシューマ(Consumer)がこれらのトピックをポーリングしてデータを取得します。
- 分散指向
コンポーネント(broker、パブリッシャ、コンシューマ)はスケールアウト可能なように設計されています。
- マスタ・スレーブアーキテクチャ
トピック内のデータはクラスタ内の複数ノード間に(レプリケーション・ファクタに基づいて)分散されます。1個のノードのみが特定データのマスタとして機能し、0個以上のノードがそのデータのコピーを持つ、つまりフォロワーとして動作することができます。
- パーティション
トピックはさらにパーティション(Partition)に分割され、各パーティションは基本的にデータ(Key-Valueのペア)を格納するコミット・ログ(commit log)として振る舞います。データは不変で、厳密に順序付け(オフセットが各データエントリに割り当てられます)された上で、(構成に基づいて)永続化され、ディスクに保持されます。
- 適した領域
Kafkaは大容量の高速なリアルタイムストリーミングデータの処理に適しています。
- JMSではない
JMSと似てはいますが違います。JMS仕様を実装したものではなく、JMSベースのソリューションを置き換えるようなものではありません。
Kafka brokerはKafkaサーバプロセス(ノード)以外の何者でもありません。複数のノードで、分散された、フォールトトレラントおよび水平方向にスケール可能なメッセージハブとして機能するクラスタを構成することができます。
Producerマイクロサービス
Producerマイクロサービスは、Kafka Java APIとJersey (JAX-RS参照実装)を使います。リアルタイムデータのPub-Subパイプラインを紹介するため、このマイクロサービスはハイペースでサンプルイベントを発行します。
サンプルデータ
Producerが生成したデータをメトリックのモデルとします。今回は、特定のマシンのCPU使用率であり、単純なキーと値のペア(名前、使用率など)と見做すことができます。 以下はその例です(属性Partitionは無視してください)
: Partition 0
event: machine-2
id: 19
data: 14%
: Partition 1
event: machine-1
id: 20
data: 5%
Consumerマイクロサービス
このシステムの2個目のマイクロサービスです。Producerと同様に、JerseyとKafkaのJava (Consumer) APIを使っています。このサービスで利用しているJerseyコンポーネントの中で注目すべきは、Server Sent Eventsモジュールです。このモジュールを使うと、サンプルアプリケーションで必要なsubscribe-and-broadcastセマンティクスを実装が容易になります(詳細は後で説明します)。
両マイクロサービスは個別のアプリケーションとしてApplication Container Cloud Serviceにデプロイされ、独立して管理、スケールすることができます。
Oracle Compute CloudへのApache Kafkaの構築
複数の方法でApache KafkaをOracle Compute Cloud (IaaS)上に構築できます。
Oracle Cloud Marketplaceの利用
マーケットプレイスのApache KafkaのBitnamiイメージを使います。詳細ドキュメントは以下のURLをご覧ください。
Oracle® Cloud
Using Oracle Compute Cloud Service (IaaS)
Creating an Instance from Oracle Cloud Marketplace
http://docs.oracle.com/cloud/latest/stcomputecs/STCSG/GUID-D7CE3F3E-CC1B-443B-BAF9-E0F6B4FD7762.htm#STCSG-GUID-D7CE3F3E-CC1B-443B-BAF9-E0F6B4FD7762
Oracle Compute CloudのVMを使う
お好みのOSが動作するCompute CloudのVMをプロビジョニングするところから始めましょう。以下のドキュメントが役にたつことでしょう。
Oracle® Cloud
Using Oracle Compute Cloud Service (IaaS)
Workflow for Creating Your First Instance
http://docs.oracle.com/cloud/latest/stcomputecs/STCSG/GUID-DD966FCF-624B-45A3-88AF-8EC123CCBCFC.htm#STCSG-GUID-DD966FCF-624B-45A3-88AF-8EC123CCBCFC
VMへのSSHアクセスの有効化
構成を進める上で、まずOracle Compute Cloud VMへのSSHアクセスを有効化(セキュリティポリシーやセキュリティルールを作成)する必要があります。Oracle LinuxやOracle SolarisベースのVMに対する手順はそれぞれ以下を確認してください。
Oracle® Cloud
Using Oracle Compute Cloud Service (IaaS)
Accessing an Oracle Linux Instance Using SSH
http://docs.oracle.com/cloud/latest/stcomputecs/STCSG/GUID-D947E2CC-0D4C-43F4-B2A9-A517037D6C11.htm#STCSG-GUID-D947E2CC-0D4C-43F4-B2A9-A517037D6C11
Accessing an Oracle Solaris Instance Using SSH
http://docs.oracle.com/cloud/latest/stcomputecs/STCSG/GUID-C72CC64D-73C7-4150-8B5A-EB39D7F72327.htm#STCSG-GUID-C72CC64D-73C7-4150-8B5A-EB39D7F72327
VMへのKafkaのインストール
このセクションではOracle LinuxベースのVMを前提としています。
以下のコマンドを使います。
sudo yum install java-1.8.0-openjdk
sudo yum install wget
mkdir -p ~/kafka-download
wget "http://redrockdigimark.com/apachemirror/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz" -O ~/kafka-download/kafka-binary.tgz
mkdir -p ~/kafka-install && cd ~/kafka-install
tar -xvzf ~/kafka-download/kafka-binary.tgz --strip 1
Kafkaのリスナーポートをオープン
Oracle Application Container Cloudにデプロイされたマイクロサービスのため、Kafka brokerサービスへのアクセス(今回の場合は9092/tcp)を許可する必要があります。以下のドキュメントにユースケースの形ですばらしいリファレンスが提供されています。
Oracle® Cloud
Using Oracle Compute Cloud Service (IaaS)
Setting Up Firewalls and Opening Ports for a Sample Scenario
http://docs.oracle.com/cloud/latest/stcomputecs/STCSG/GUID-DE568AAF-39CE-462C-B605-B96AE4036825.htm#OCSUG292
Security Applicationを作成し、プロトコルと対応するポートを指定します。詳細は以下のドキュメントをご覧ください。
Oracle® Cloud
Using Oracle Compute Cloud Service (IaaS)
Creating a Security Application
http://docs.oracle.com/cloud/latest/stcomputecs/STCSG/GUID-4B073EF2-0D7C-4AD8-A40A-585C4E2F938C.htm#OCSUG183
先ほど作成したSecurity Applicationを参照し、Security Ruleを作成します。この設定で、(ルールで定義したように)パブリック・インターネットから9092/tcpへのトラフィックを(Security Application構成毎に)許可します。詳細は以下のドキュメントを参照してください。
Oracle® Cloud
Using Oracle Compute Cloud Service (IaaS)
Creating a Security Rule
http://docs.oracle.com/cloud/latest/stcomputecs/STCSG/GUID-630622EC-160B-4523-88AD-F7B46463A0BE.htm#GUID-A3BEB363-C262-4F1E-909D-AA73AF2D65C9
以下のような構成になるはずです。
Kafka brokerの構成
Compute Cloud環境用にKafkaサーバのプロパティファイル(<KAFKA_INSTALL>/config/server.properties)中の以下の属性を編集します。
Compute CloudインスタンスのパブリックDNS
パブリックIPが140.44.88.200であれば、パブリックIPに対応するFQDNはoc-140-44-88-200.compute.oraclecloud.comになります。
属性 | 値 |
listeners | PLAINTEXT://<oracle-compute-private-IP>:<kafka-listen-port>
(例) PLAINTEXT://10.190.210.199:9092 |
advertised.listeners | PLAINTEXT://<oracle-compute-public-DNS>:<kafka-listen-port>
(例) PLAINTEXT://oc-140-44-88-200.compute.oraclecloud.com:9092 |
以下は
server.properties
ファイルのスナップショットです。
KAFKA_INSTALL/bin/zookeeper-server-start.sh config/zookeeper.properties
を実行して、Zookeeperを起動します。
KAFKA_INSTALL/bin/kafka-server-start.sh config/server.properties
を実行して、Kafka Brokerを起動します。
Zookeeper起動前にKafka brokerを起動しないでください。
ソリューション概要
イベントフロー・シーケンス
これらのコンポーネントが協調してどのようにユースケース全体をサポートするのかを説明します。
ProducerがKafka brokerにイベントをPush
Consumer側では…
- アプリケーションはデータ取得のためKafka brokerをポーリングします(KafkaではPoll/Pullモデルが使われています。よく見られるPushモデルではありません)
- (ブラウザやHTTPクライアントといった)クライアントはシンプルなHTTP GETリクエストを特定のURL(例:https://<acc-app-url>/metrics)に送信してイベントをサブスクライブします。アプリケーション内でイベントが発生したときにクライアントがイベントを取得するという、1回のサブスクライブ後、クライアントはいつでも切断することができます。
非同期+疎結合
Consumerがメトリックデータを生成します。あるConsumerがブラウザベースのクライアント用のリアルタイムフィードを利用可能にしますが、同じデータに関する異なるビジネスロジックが実装されている複数のConsumer(例えば、処理や分析用途のためにメトリックデータを永続データストアにプッシュする)が存在する可能性があります。
Server Sent Events (SSE)について
SSEは、HTTPとWebSocketの中間的なものです。クライアントは接続リクエストを送信し、接続が確立すると接続は開いたままになり、サーバからデータを引き続き受信できます。
- これは、サーバーのポーリングを回避できるため、単一のリクエストごとにHTTPリクエスト/レスポンスを張る方式に比べて効率的です。
- 全二重であるWebSocketとは異なります。具体的には、クライアントとサーバは、接続が確立された後はいつでもメッセージを交換することができるますが、SSEでは、クライアントはリクエストを1回だけ送信します。
このモデルは、クライアントが接続してデータが到着するのを待つだけなので、今回のサンプルアプリケーションに適しています(サブスクリプションした後は、サーバーと対話する必要がないためです)。
スケーラビリティ
高いスループットとパフォーマンスを維持するために、このシステムのすべての部分がステートレスで水平方向にスケール可能であることに注意する必要があります。第2部では、スケーラビリティの側面をさらに深く理解し、Application Container Cloud Serviceで簡単にスケールさせる方法について説明します。
コード
このセクションでは、このサンプルで使った両方のマイクロサービスのコードの概要と重要なポイントを紹介します。
Producer マイクロサービス
アプリケーションのブートストラップ、イベント生成などを処理するクラスで構成されています。
Class | Details |
ProducerBootstrap.java | アプリケーションのエントリポイント。
Grizzlyコンテナを起動する。 |
Producer.java | 専用スレッドで動作。
イベント生成の中核ロジックを含む。 |
ProducerManagerResource.java | producerプロセスの開始・停止のためのHTTP(s)エンドポイントを公開。 |
ProducerLifecycleManager.java | ExecutorServiceを使いProducerスレッドを管理するロジックを実装(ProducerManagerResourceが内部で利用)。 |
Consumer マイクロサービス
Class | Details |
ConsumerBootstrap.java | アプリケーションのエントリポイント。
Grizzlyコンテナを起動してConsumerプロセスを呼び出す。 |
Consumer.java | 専用スレッドで動作。
イベント消費の中核ロジックを含む。 |
ConsumerEventResource.java | エンドユーザーがイベント消費するためのHTTP(s)エンドポイントを公開。 |
EventCoordinator.java | イベントサブスクリプションおよびブロードキャスティングを実装するためのJersey SSEBroadcasterのラッパー(ConsumerEventResourceが内部で利用)。 |
以下の理由で、
Jersey SSE Broadcaster を使っています。
- クライアントの統計情報を追跡する
- クライアント接続が切断されると自動的にサーバーリソースを破棄する
- スレッドセーフである
Oracle Application Container Cloudへのデプロイ
ここまででアプリケーションについて情報を得たので、ビルド、パッケージング、デプロイについて見ていきます。
まずソースコード (zipファイル) をダウンロードしてください。このエントリの最初(原文からダウンロードする場合は最後)にリンクがあります。
Metadata files
manifest.json:
このファイルはいじらずにそのまま使うことができます。
deployment.json
Kafka brokerに対応する環境変数が含まれています。値はデプロイ前にユーザーが埋めるためにプレースホルダーとして残してあります。
{
"environment": {
"KAFKA_CLUSTER":"<as-configured-in-kafka-server-properties>"
}
}
この値(Oracle Compute CloudインスタンスのパブリックDNSのFQDN)は、Kafkaの server.properties ファイル中の属性 advertised.listeners で構成した値と一致する必要があります。
metadataファイルの詳細は、以下のドキュメントをご覧ください。
Oracle® Cloud Developing for Oracle Application Container Cloud Service
Creating Metadata Files
http://docs.oracle.com/en/cloud/paas/app-container-cloud/dvcjv/creating-meta-data-files.html
Build & zip
manifest.json ファイルだけを使ってJARをビルドしてZipで圧縮し、クラウドにデプロイできるアーティファクトを生成します。
Producerアプリケーション
cd <download_dir>/producer // mavenプロジェクトの展開場所
mvn clean install
zip accs-kafka-producer.zip manifest.json target/accs-kafka-producer.jar //you can also use tar to create a tgz file
Consumerアプリケーション
cd <download_dir>/consumer //mavenプロジェクトの展開場所
mvn clean install
zip accs-kafka-consumer.zip manifest.json target/accs-kafka-consumer.jar
アプリケーションのzipファイルをOracle Storage cloudへアップロード
まずOracle Storage Cloudに先ほど作成したアプリケーションのZipファイルをアップロードした上で、後続の手順でそのZipファイルを参照していきます。以下は必要なcURLコマンドです。
(まだ作成していない場合は)Oracle Storage Cloudにコンテナを作成する
curl -i -X PUT -u <USER_ID>:<USER_PASSWORD> <STORAGE_CLOUD_CONTAINER_URL>
(例) curl -X PUT –u jdoe:foobar "https://domain007.storage.oraclecloud.com/v1/Storage-domain007/accs-kafka-consumer/"
Zipファイルをコンテナにアップロード(ZipファイルはStorage Cloudのオブジェクトにすぎません)
curl -X PUT -u <USER_ID>:<USER_PASSWORD> <STORAGE_CLOUD_CONTAINER_URL> -T <zip_file> "<storage_cloud_object_URL>" //template
(例) curl -X PUT –u jdoe:foobar -T accs-kafka-consumer.zip "https://domain007.storage.oraclecloud.com/v1/Storage-domain007/accs-kafka-consumer/accs-kafka-consumer.zip"
Producerマイクロサービスについても同じ手順を繰り返してください。
Application Container Cloudへのデプロイ
Zipファイルのアップロードが完了したら、アプリケーションをデプロイするために利用するApplication Container Cloud REST APIを使って当該ZipファイルのOracle Storage Cloudにおけるパスを参照することができます。以下はREST APIを使うcURLコマンドの例です。
curl -X POST -u joe@example.com:password \
-H "X-ID-TENANT-NAME:domain007" \
-H "Content-Type: multipart/form-data" -F "name=accs-kafka-consumer" \
-F "runtime=java" -F "subscription=Monthly" \
-F "deployment=@deployment.json" \
-F "archiveURL=accs-kafka-consumer/accs-kafka-consumer.zip" \
-F "notes=notes for deployment" \
https://apaas.oraclecloud.com/paas/service/apaas/api/v1.1/apps/domain007
Producerマイクロサービスについても同じ手順を繰り返してください。
デプロイ後
Container Cloudコンソ―ルの
Applicationsセクションの下にマイクロサービスを確認できるはずです。
特定のアプリケーションの詳細情報を見ると、環境変数も存在するはずです。
アプリケーションのテスト
Producer
accs-kafka-producer マイクロサービスのために、Kafka Producerプロセス(スレッド)をユーザーが開始する必要があります(これは単に柔軟性を提供するためです)。下表に記載の適切なコマンドを(
cURLや
Postmanなどを使って)発行し、Producerプロセスを管理します。
Producerを開始した後は、停止するまでイベントを発行し続けます。
Consumer
accs-kafka-consumer マイクロサービスでは、Kafka Consumerプロセスはアプリケーションとともに起動します。つまりメトリック収集のためにKafka brokerへのポーリングを開始します。前述の通り、Consumerアプリケーションは、リアルタイムにメトリックデータを見るための(Server Sent Eventsを使った)HTTP(s)エンドポイントを提供しています。
下図のようなリアルタイムデータストリームを確認できるはずです。属性
eventはマシン名とID、属性
dataはCPU使用率を表します。
属性Partitionは無視してください。これはスケーラビリティや負荷分散といった第2部で説明予定です。
参考資料
第2部はまた後ほど。