[Kubernetes] Hashicorp ValutとOKEとの統合/Integrating Hashicorp Vault with OKE (Oracle Container Engine for Kubernetes)

原文はこちら

ようこそ、ここではVaultとOracle Container Engine for Kubernetes(OKE)のインテグレーションのご紹介します。

前書き

まずSecret管理のためにデザインされた包括的なツールであるHashicorp Vaultをご紹介さします。
次に、Kubernetes Service Account Tokenを使ってVaultとの認証を円滑化するために使われるVault Kubernetes Auth Methodの概要をご説明します。この認証のやり方はKubernetesネイティブのIdentityとアクセス管理を活用しつつ、VaultトークンのKubernetes Podへの導入を容易にしています。
最後に、OKE上へのVaultのデプロイメントシナリオの概要をご説明します。そこではVaultがOKEにデプロイされており、さらにVault Kubernetes Auth Methodを使用してVaultがOKEクラスターのコントロールプレーンにインテグレーションされています。このデプロイメントシナリオは、ステップバイステップガイドおよび必要となる設定ファイルを含めて、この作業マニュアルに詳細が記載されています。
Oracle Cloud Infrastructure Container Engine for Kubernetes(多くの場合「OKE」と省略されます)はフルマネージド、スケーラブルで高可用性をもったサービスで、みなさんのコンテナ化されたアプリケーションをクラウドにデプロイするために使えます。みなさんの開発チームがクラウドネイティブアプリケーションを高信頼にビルド、デプロイそして管理するためにOKEを使っていただけます。OKEが使っているKubernetesは、クラスター化されたホストにまたがったコンテナ化されたアプリケーションのデプロイとスケーリング、管理を自動化するオープンソースのシステムです。

Vault

Secret管理はVaultの主要ユースケースのひとつです。ここでSecretというのは何かしら厳密にアクセスを制限したいもの、たとえばAPIキーやパスワード、証明書などです。VaultはこうしたSecretについて種類を問わず統一したインターフェースを提供し、そしてAPIドリヴンフレームワークによる緊密なアクセス制御やバージョン管理、詳細な監査ログなどの重要となる機能も提供しています。
多くの組織で、認証情報をソースコードにハードコーディングしてしまっており、それらはいくつかの設定ファイルや設定管理ツールに散らばっていたり、平文のままバージョン管理ツールやWiki、共有ストレージに保存されてしまっています。Vaultはこうした認証情報を保持するための中心地を提供し、それらが暗号化され、アクセスが監査ログに記録され、資格をもったクライアントのみが見ることができることを確実にします。
ValutはSecret管理、データ保護、Identityベースのアクセス、コラボレーションとオペレーション、ガバナンス、そしてコンプライアンスについての広範な機能を提供しています。
VaultはAcrive DirectoryやLDAP、Kubernetes、そしてクラウドプラットフォームなどの信頼済のIdentityソースに対して認証を行うことで、Secretや暗号鍵へのアクセスを緊密に制御することができます。Vaultにより、ユーザーやアプリケーションにSecretや鍵へのアクセス許可を行ううえでの、細かい粒度での認証が可能になります。

Kubernetes Auth Method

Vaultが用いられている場合、アクターにとって、Secretの格納/取り出しおよび暗号学的な操作に先立って認証の成功が必要となります。Vaultでの認証のコアはトークンです。これはつまり、Secretの利用者はまず有効なトークンを獲得する必要があるということです。Vaultの認証プロセスはクライアントのIdentity(Secretの利用者)を検証し、そしてこのIdentityに紐付いたトークンを生成します。
様々なプラットフォームで稼働するアプリケーションの要件を満たすため、Vaultはいくつかの認証メソッドを提供しています。Kubernetes Auth MethodはOKEのようなKubernetesベースのオーケストレーターとうまく組み合わせることができます。
Kubernetes Auth Methodは、Kubernetes Service Account Tokenを用いてVaultと組み合わせての認証を実現します。このPodのサービスアカウントのためのトークンは、Podが初期化される際にPodの中の`/var/run/secrets/kubernetes.io/serviceaccount/token`に自動的にマウントされます。Kubernetes Auth Methodを用いる場合にVaultに認証用に送られるのはこのトークンです。
Kubernetes TokenReview APIに対してのアクセス権限をもったサービスアカウントとVaultとの設定をしておきます。Vaultは、SecretにアクセスするためにVaultに接続を試みているPodが提示してきたサービスアカウントトークンを検証するために、Kubernetes API Serverに対して認証済のコールを行うためにこのサービスアカウントを利用します。
このVaultデプロイメントシナリオの図は前述の作業マニュアルの中で実装されるソリューションアーキテクチャを説明しています。
オレンジで表されている2番のフローは、Kubernetes Auth Methodを用いて実装される認証フローの概要を示しています。
成功裏に完了すると、Vaultは当該アプリケーションに予め設定されたポリシーをつけてトークンを返します。その後のStep 3以降では、このアプリケーションはこのトークンを使ってVaultのKey/Value SecretエンジンからSecretを取得することができます。
デプロイメントシナリオ
以下は前述のデプロイメントシナリオでプロビジョニングされるコンポーネントの概覧です:
  • OKE Kubernetes cluster
    • etcdおよびVault Kubernetesオペレーター
    • Vaultクラスター
      • Kubernetes Auth Methodを使うよう設定されたVault
      • Vault Key/Value (KV)ストア `secret/testapp`
      • デフォルトネームスペース内のKubernetesサービスアカウント'testapp'に紐付けられたVaultロール'testapp'
      • 'testapp'ロールに紐付けられたVaultポリシー'testapp-kv-crud' (`secret/testapp` KVストアへのCRUDアクセスを提供)
    • Vaultクラスターのための永続ストレージとして機能するetcdクラスター
    • `testapp`サービスアカウントを使用し、(Kubernetes認証を通じて)Vaultと認証されるテストアプリケーション
    • Vaultの`secret/testapp` KVストアでSecretを作成/取得するテストアプリケーション
etcd、およびVaultのクラスターはそれぞれのオペレーターによって作成されます。VaultオペレーターはVaultクラスターをKubernetes上にデプロイし、管理します。Vaultオペレーターによって作成されたVaultインスタンスは高い可用性を持ち、自動フェイルオーバとアップグレードをサポートしています。それぞれのVaultクラスターのために、Vaultオペレーターは永続ストレージバックエンドとしてetcdクラスターも作成します。
以下は、作業マニュアルで説明されている、VaultのインストールおよびOKEとのインテグレーションの手順の概観です:
  1. Vault & etcdオペレーターのデプロイ
  2. Vault & etcdクラスターのデプロイ
  3. Vault Kubernetes認証の設定
  4. Vault Key/Value (KV)ストア、およびテストアプリケーションに紐付けられたポリシーの作成
  5. テストアプリケーションのデプロイと、Kubernetes認証を使ったVaultへの認証
  6. KVストアへのSecretの作成、およびストアからの読み取り
この作業マニュアルは、OKEでの高可用デプロイメントについて素早く学び、活用するための素晴らしい方法です。

まとめ

Kubenetes Auth Methodを使ったVaultの認証とKubernetesのインテグレーションにより、Kubernetes Service Account Tokenを使ったVaultクライアントの認証プロセスをシンプルにすることができます。
一方で、カスタムアプリケーションロジックを書くことなく標準的なやりかたでトークンのライフサイクルを管理するためには、いくつかの課題が残っています。Vaultのチームはこれらの課題を解決するため、いくつかの機能を検討しています。
提案されている機構のひとつには、周期的に同期プロセスを実行することによってVaultとKubernetes Secrets機構をインテグレーションするものがあります。他にも、Container Storage Interfaceプラグインによって稼働中のPodにSecretをインジェクトするといったものもあります。さらに、サイドカーコンテナ経由でPodにVault Secretをインジェクトする、というものも。それぞれ、複雑性の解消のためのアプローチとして非常に興味深いです。
Kubernetesとクラウドネイティブの領域はとても動きが速いです。ソリューション、チュートリアル、ベストプラクティスなどを共有し、インスピレーションを得たり、ハンズオンを行ったり、産業のポテンシャルを解放することを支援しながら、この領域の進歩を観察していきたいと思います。

[Machine Learning] Oracle CloudでAutoMLを使って機械学習/Hide Machine Learning Complexity Using AutoML in Oracle Cloud

原文はこちら


 データは新たな石油であり、データサイエンティストは未来の職種です。わたしたち誰もがこの職種について聞いたことがありますが、企業にとってはこの職種を雇うのはいまだ簡単ではないようです。十分な経験を持ち合わせていないわたしたちが機械学習を始めるにはどのようにしたらいいでしょうか?

機械学習におけるチャレンジ

機械学習のユースケースのうち、画像分類や価格予想、異常検知などのいくつかのものはとてもシンプルです。しかしこうしたユースケースについても、ニューロンネットワークの知識を持った専門のデータサイエンティストに、結果の改善とチューニングの手助けをしてもらう必要があります。機械学習の主なタスクとして、以下が必要となります:
  • データの前処理とクリーニング
  • 適切な機能の選定と構成
  • 適切なモデル・ファミリーの選定
  • モデルのハイパーパラメータの最適化
  • 機械学習モデルの後処理
  • 得られた結果の批判的分析
Depending on the business problem, it can take up to hundreds of experiments until we reach the solution. This is for experienced data scientists -- imagine for a non-expert!
ビジネス上の課題によっては、解決にたどり着くまでに何百もの実験が必要になります。ここで経験豊富なデータサイエンティストの出番というわけです。非ー専門家には難しすぎます! 

AutoMLとは?

AutoMLというアイディアとは、ニューロンネットワークの改善に機械学習を使うということです。ユーザーによって指定されたタイムフレームの中で、学習とチューニングを自動的に行うのがAutoMLの役目となります。このとき複雑さはすべてシンプルなAPIやフレームワークの裏に隠蔽され、機械学習モデルをわずか数行のコードで作成できるようにしてくれるわけです。
エンタープライズ企業にとってAutoMLは以下をはじめ多くの利点をもたらします:
  • 迅速なマーケット投入…あなたのデータサイエンティストはパラメータ調整やルーチン作業から解放され、課題に集中できるようになります
  • 誰でも機械学習を始められる…機械学習を始めるのに専門家になる必要はありません
すごいですね!このAutoMLの選択肢としてGoogleが提供するサービスが ありますが、1時間あたり20ドル程度のコストがかかります!ならオープンソースコミュニティを検討してみませんか?

Oracle CloudでAuto-Kerasを実行する

Auto-Kerasはオープンソースのpythonのツールで、Kerasをベースにしています。Texas A&M UniversityのData Labで開発されました。Auto-Kerasはあなたのディープラーニングモデルのための正しいアーキテクチャとハイパーパラメータを自動的にサーチします。インストールも実行も簡単で、成長中のコミュニティからは多くの例が提供されています。
Auto-KerasをOracle Cloudでインストールして使ってみましょう!
どうしてOracle Cloudかって?多くの理由があります。テストや開発をしたい、あるいは商用稼働のための環境がほしい、そのどちらにもソリューションをご用意しています。またOracleは商用環境のためのGPUマシンも提供しています。
OracleはFree Tierを提供しており、Autonomous DatabaseとComputeのリソースを時間制限なしで使えます!そうです、時間制限なしにです!ここで無料アカウントをゲットして始めましょう!
ここでの例では、手書きの数字データを収めたMNISTデータベースを使います。このデータベースは各桁についてラベルも含んでいます。目標は数字の認識です。KerasとAuto-Kerasを比べてみることもできますね。Kerasでは71行のコードが必要となります(ここでチェックできます)。
ではこれをAuto-Kerasで実行してみましょう。ここではFree TierのシンプルなComputeマシンを使ってこの例を実行していきます。
Auto-Kerasはまだプレリリースバージョンであることには留意しましょうね。正式バージョンがすぐにリリースされるように願っておきましょう。
まず、前提となるpython 3.6をインストールする必要があります: 
$ yum install python36
Auto-Kerasのインストールはすごく簡単で、以下を実行するだけです:
$ pip3 install autokeras 
準備は万端となったので、例の実行のために以下のコードを実行しましょう: 
from tensorflow.keras.datasets import mnist
from autokeras.image.image_supervised import ImageClassifier

(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.reshape(x_train.shape + (1,))
x_test = x_test.reshape(x_test.shape + (1,))
clf = ImageClassifier(verbose=True, augment=False)
clf.fit(x_train, y_train, time_limit=1 * 60 * 60)
clf.final_fit(x_train, y_train, x_test, y_test, retrain=True)
y = clf.evaluate(x_test, y_test)
print(y * 100)

モデルの比較とチューニングが行われ、過程と結果とが表示されます:

Saving model.
+--------------------------------------------------------------------------+
|        Model ID        |          Loss          |      Metric Value      |
+--------------------------------------------------------------------------+
|           0            |  0.14653808772563934   |   0.9875999999999999   |
+--------------------------------------------------------------------------+


+----------------------------------------------+
|               Training model 1               |
+----------------------------------------------+
Epoch-1, Current Metric - 0:  13%|███                         | 60/465 [00:55<06:12,  1.09 batch/s]
Epoch-2, Current Metric - 0.98:  75%|██████████████████      | 350/465 [05:49<01:57,  1.03s/ batch]
Epoch-4, Current Metric - 0.992:  52%|████████████           | 240/465 [04:08<03:58,  1.06s/ batch]

最終的に、98.65の精度を得ることができました。これで終わりです!
AIおよび機械学習についてもっと知りたい方は、Oracle AIのページも見てみてください。

[Streaming] KafkaワークロードのOracle Cloud Streamingへの移行/Migrate Your Kafka Workloads To Oracle Cloud Streaming

原文はこちら

あるホスティング環境から別の環境へのアプリケーションとワークロードの移行はともするとたいへんな道のりになります。過去15年のあいだ、わたしは何度もなんとか移行をしなければならないことがあり、それらは完全に平易なものでも楽しいものでもありませんでした。そこでは避けがたく予期していなかった問題が生じ、時には単純なアップグレードですら終わりなき頭痛を引き起こすこともありました。なので、Oracle Cloudへアプリケーションを移行しなければならない開発者、そしてDevOpsエンジニアのみなさんには同情します。同時に、アプリケーションをOracle Cloudへ移行することによって得られる利益はいつだって不利益を上回るとも考えています。このような移行の労苦については多くの検討が内部でなされており、数あるツール、また、プロセスをより簡単で苦労を少なくするためのちょっとした「ワザ」が開発されています。このポストではそうした「ワザ」のうちひとつについてお話します。

Kafkaは疑いなくデータストリーミング(とその他)でポピュラーで、それは性能がよく、信頼でき、利用を容易にするためのSDK実装がいくつもあるからです。あなたのアプリケーションもすでにKafkaを利用しているかもしれません。おそらくあなたはあるマイクロサービスでメッセージをProduceし、他のマイクロサービスでConsumeしているかも。ではなぜこのような目的のために、KafkaではなくOracle Streaming Service(OSS)を使うことを検討するべきなのでしょうか?わたしの経験上、ZookeeperとKafkaクラスターをホストするためのインフラを構築し、保守するのは大変な苦労(とコスト)を伴い、したがってあなたは深い知識と構成管理のための余分な時間が必要になります。その代わりに、すぐに使い出せるホステッド環境を提供するOSSのようなサービスを活用することで、そうした時間(とコストの一部)が不要になります。このブログポストでは、あなたのアプリケーションでKafka SDK for Javaを使うことでOSSをかんたんに使えるということをご説明していきます。このKafka互換機能は現状ではLimited Availabilityでの提供となっていますが、遠くないうちにGAとなる予定です。

Streamの構成

まず初めに、このデモのためにちゃちゃっとStreamトピックを構成しちゃいましょう。Oracle Cloudのダッシュボードコンソールから、'Analytics' -> 'Streaming'と選択します。
Streamのリストページで、'Create Stream'をクリック(必要に応じてこのStreamを配置するのに適切なコンパートメントを左のサイドバーから選択しておいてください)。
Create Streamダイアログで、Streamの名前をつけ、Retentionの値(あるメッセージがトピックから破棄されるまでの時間)を入力します。
希望のパーティション数を入力して、'Create Stream'をクリック。
新しいStreamの詳細ページに直接遷移し、おおよそ30秒でStreamの状態がActiveと表示されるでしょう。

Streams Userの作成

次にやるのはStreamingサービス専用のユーザーの作成です。コンソールサイドバーメニューからIdentity配下のUsersをクリック。
'Create User'をクリックして出てくるダイアログを埋めます。
新しいユーザーが作成されたら、ユーザー詳細ページに行ってAuth Tokenを生成します。
Tokenをコピーしてしっかり保存しておきましょう。このGenerate Tokenダイアログをいったん離れると取得することはできません。Identityコンソールにいるはずなので、サイドバーからCompartmentsをクリックしてStreamを作成したコンパートメントを見つけてください。そのコンパートメントのOCISをコピーし、これもあとで使うので保存しておいてください。
さて、ここからグループを作成し、先ほど作成したユーザーをこのグループに追加し、グループポリシーを作成します。
ここまでAuth Tokenを持ち、Streamを利用できるグループに所属したユーザーが用意できました。やったぜ。また、コンパートメントのOCIDも控えてあるので、コードに進んでいく準備ができています。

Kafka Producerの作成

ここまででKafka Client SDKを使ってこのStreamを利用するコードへと進む準備ができました。わたしたちのプロジェクトでの最初のステップはClient SDKの依存するライブラリが存在することの確認です。わたしはGradleを使っていますが、あなたが他のものを使っているのであれば適切に修正してください。
plugins {
    id 'java'
    id 'application'
}
group 'codes.recursive'
version '0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
    mavenCentral()
}
application {
    mainClassName = 'codes.recursive.KafkaProducerExample'
}
dependencies {
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}
tasks.withType(JavaExec) {
    systemProperties System.properties
}
view rawbuild.gradle hosted with ❤ by GitHub
ここでわたしはProducerをテストするために以下のような小さなJavaプログラムを使います。
package codes.recursive;
public class KafkaProducerExample {
public static void main(String... args) throws Exception {
System.out.println("producer");
CompatibleProducer producer = new CompatibleProducer();
producer.produce();
}
}
CompatibleProducer クラスをビルドする前に、事前に取得しておいた必要な認証情報をIDEのRun/Debug設定に入れ込んでおきます。
べつにひどく複雑ってわけではないんですが、でもそれぞれの要素がわかりやすくなるように、ここで一度にはCompatibleProducer クラスだけをビルドすることにしましょう。まず、認証情報のための変数をいくつか宣言し、アプリケーションに渡す環境変数から値をセットするようにしておきます。
public class CompatibleProducer {
    public void produce() {
        String authToken = System.getenv("AUTH_TOKEN");
        String tenancyName = System.getenv("TENANCY_NAME");
        String username = System.getenv("STREAMING_USERNAME");
        String compartmentId = System.getenv("COMPARTMENT_ID");
        String topicName = System.getenv("TOPIC_NAME");
     }
}
次に、 KafkaProducerを構成するために使ういくつかのプロパティを作成します。これらはKafka SDKを使ってOSSのStreamにアクセスするために必要になるプロパティです。
:Streamingトピックを作成したリージョンに合わせて、"bootstrap.servers"のリージョンの値を変更する必要がある場合があります。
Properties properties = new Properties();
properties.put("bootstrap.servers", "streaming.us-phoenix-1.oci.oraclecloud.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                + tenancyName + "/"
                + username + "/"
                + compartmentId + "\" "
                + "password=\""
                + authToken + "\";"
);
properties.put("retries", 5); // retries on transient errors and load balancing disconnection
properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
最後に、 KafkaProducer を構成してプロパティを入れ込み、5つの「test」メッセージをトピックにProduceしましょう。
KafkaProducer producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, UUID.randomUUID().toString(), "Test record #" + i);
    producer.send(record, (md, ex) -> {
        if( ex != null ) {
            ex.printStackTrace();
        }
        else {
            System.out.println(
                    "Sent msg to "
                            + md.partition()
                            + " with offset "
                            + md.offset()
                            + " at "
                            + md.timestamp()
            );
        }
    });
}
producer.flush();
producer.close();
System.out.println("produced 5 messages");
あなたのIDEでこのプログラムを実行すると、以下のような出力が得られるでしょう。
> Task :producer:run
producer
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Sent msg to 0 with offset 57 at 1570626362254
Sent msg to 0 with offset 58 at 1570626362254
Sent msg to 0 with offset 59 at 1570626362254
Sent msg to 0 with offset 60 at 1570626362254
Sent msg to 0 with offset 61 at 1570626362254
produced 5 messages
BUILD SUCCESSFUL in 8s
2 actionable tasks: 2 executed
09:06:01: Task execution finished 'run'.
Disconnected from the target VM, address: '127.0.0.1:59701', transport: 'socket'
view rawoutput.sh hosted with ❤ by GitHub
ここでStreamingコンソールをちょっと見て、5つのメッセージがProduceされて表示されていることを確認してみましょう。
次に、Kafka互換のConsumerを作成してこれらのProduceされたメッセージをConsumeします。

Kafka Consumerの作成

Consumer作成のステップのうち大部分はProducer作成のためのステップと同様です(Gradleの依存ライブラリ、Run設定の環境変数、など)。なのでここでは CompatibleConsumer クラス自体にフォーカスしたいと思います。何かしら見逃しがあるんじゃないかと心配でも逃げ出さないでくださいね、このブログポストのコード全部がGitHubで見られます。わたしたちの互換Consumerを作成していきましょう!
The consumer starts out similarly - declaring the credentials, setting some properties (which do differ slightly from the producer, so beware!) and creating the Consumer itself:
Consumerは同じように作っていきます。認証情報を宣言し、プロパティを設定(Producer用のとはちょっと異なるので注意してくださいね!)し、 Consumer 自体を作成します。
:Procuderのときと同様、Streamingトピックを作成したリージョンに合わせて、"bootstrap.servers"のリージョンの値を変更する必要がある場合があります。
String authToken = System.getenv("AUTH_TOKEN");
String tenancyName = System.getenv("TENANCY_NAME");
String username = System.getenv("STREAMING_USERNAME");
String compartmentId = System.getenv("COMPARTMENT_ID");
String topicName = System.getenv("TOPIC_NAME");
Properties properties = new Properties();
properties.put("bootstrap.servers", "streaming.us-phoenix-1.oci.oraclecloud.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-0");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put("sasl.jaas.config",
        "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                + tenancyName + "/"
                + username + "/"
                + compartmentId + "\" "
                + "password=\""
                + authToken + "\";"
);
properties.put("max.partition.fetch.bytes", 1024 * 1024); // limit request size to 1MB per partition
Consumer<Long, String> consumer = new KafkaConsumer<>(properties);
ここで、作成しておいたトピックへのSubscriptionを構成し、1秒毎に新規メッセージをポーリングさせます。
try {
    consumer.subscribe(Collections.singletonList( topicName ) );
    while(true) {
        Duration duration = Duration.ofMillis(1000);
        ConsumerRecords<Long, String> consumerRecords = consumer.poll(duration);
        consumerRecords.forEach(record -> {
            System.out.println("Record Key " + record.key());
            System.out.println("Record value " + record.value());
            System.out.println("Record partition " + record.partition());
            System.out.println("Record offset " + record.offset());
        });
        // commits the offset of record to broker.
        consumer.commitAsync();
    }
}
catch(WakeupException e) {
    // do nothing, shutting down...
}
finally {
    System.out.println("closing consumer");
    consumer.close();
}
最後のステップはこのConsumerサンプルを実行した上でProducerに火を入れて、このConsumerが新しくProduceされたメッセージをConsumeするのを観察することです。
> Task :consumer:run
consumer
org.apache.kafka.common.security.plain.PlainLoginModule required username="toddrsharp/streaming-user/ocid1.compartment.oc1..aaaaaaaa7lzppsdxt6j56zhpvy6u5gyrenwyc2e2h4fak5ydvv6kt7anizbq" password="M0}tf)R<eCumKcgic6mC";
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Record Key 92a078df-89bd-4c55-a40b-668509f5d543
Record value Test record #0
Record partition 0
Record offset 62
Record Key ba12e804-49b0-49cc-ac6f-c45dc52bf543
Record value Test record #1
Record partition 0
Record offset 63
Record Key 29883f60-5fb1-4c94-a23f-5a489f109d82
Record value Test record #2
Record partition 0
Record offset 64
Record Key 00af0e12-a92f-4162-b2a6-22cdcf04fd73
Record value Test record #3
Record partition 0
Record offset 65
Record Key dd69776a-1bfd-419a-b6b0-7445c098b20e
Record value Test record #4
Record partition 0
Record offset 66
<=========----> 75% EXECUTING [35s]
view rawconsumer_output.sh hosted with ❤ by GitHub
はい、というわけで、あなたはKafka SDK for Javaを使ってOracle Streaming ServiceトピックとメッセージをProduce、Consumeできました。
Hey! このブログポストのコードはGitHubで見られるます: https://github.com/recursivecodes/oss-kafka-compatible-streaming