[Streaming] Oracle Streaming ServiceでObject Storageにメッセージを送信/Publishing To Object Storage From Oracle Streaming Service

原文はこちら

Oracle Streaming Serviceに関する前回のポストでは、Oracle Streaming Serviceの持つKafka Connectとの組み合わせ機能を使ってAutonomous DBインスタンス上での更新内容を直接StreamにPublishする方法についてご紹介しました。このポストでは、前回と同じくらいすごいことをお見せします。それは何かというと、Streamの内容を直接Object Storageバケットに書き込む方法です。手順は前回のものと似ているところもあるんですが、いくつか明らかな違いもあります。今回は、Kafka ConnectのS3 Sink Connectorを使って目的を実現していきます。Oracle Object StorageはS3と完全互換のエンドポイントを持っているため、このS3 Sink Connectorを使って簡単にStreamのデータをOCIバケットに入れられちゃうんです。以下のチュートリアルを読めばやり方は全部わかるようになってますよ、ということで始めましょう!

S3 Sink Connectorのための準備

ユーザーのセットアップ

始める前に、あなたのマシンのどこかに、いくつかの必要なファイルを置くためのプロジェクトディレクトリを作っておくとよいでしょう。ここではそのディレクトリを /projects/object-storage-demo と参照することにしますが、あなたの実際のディレクトリで読み替えるようにしてください。
秘密鍵と適切なポリシーの用意の他に、認証トークンを付与した専用ユーザーが必要になります。これについてはこのポストで説明しているステップに従ってください。ユーザーが用意できたら、そのユーザーについて以下の手順でいくつか設定をいじっていきます。

秘密鍵の生成

Streamのユーザーのために秘密鍵の生成が必要になります。ここでの手順では、アクセスキーと、Kafka S3 Sink Connectorが必要とするS3互換認証のために使う秘密鍵が生成されます。用意したユーザーの詳細ベージに行って、サイドバーメニューから'Customer Secret Keys'をクリックし、そして'Generate Secret Key'をクリックしてください:
鍵に名前をつけて'Generate Secret Key'をクリック。
生成された鍵をコピーしてください。これがあなたのS3互換の秘密鍵の値になります。後で使うのでどこかに保存しておきましょう。
Closeをクリックいsて、次に'Access Key'の値をコピーしましょう。これも保存しておきましょう。
次に進む前に、 /projects/object-storage-demo/aws_credentials のファイルを作って以下の内容を記載しておきます。Secret KeyとAccess Keyは実際のものに置き換えてください。
[default]
aws_access_key_id=[generated access key]
aws_secret_access_key=[generated secret key]
view rawaws_credentials hosted with ❤ by GitHub

ポリシーの修正

次に、専用ユーザー用に作成されたポリシーを、Object Storageにアクセスできるように修正していきます。以下のようにふたつのポリシーを追加してください:

依存モジュールのダウンロード

S3 Connectorを持ってこなければなりませんので、Confluentからダウンロードしてプロジェクトディレクトリの /projects/object-storage-demo/confluentinc-kafka-connect-s3-5.3.2 にUnzipした中身を配置しましょう。

Oracle Streamingのアセット準備

Stream PoolとStream、そしてConnect Configurationが必要になります。以下でそれぞれ作っていきます。コンソールのハンバーガーメニューから'Analytics' -> 'Streaming'を選択。

Stream PoolとStreamの作成

Streamingのページから、'Stream Pools'を選択し、'Create Stream Pool'をクリック。
Stream Poolに名前をつけて、'Create Stream Pool'をクリック。
Stream Poolがアクティブになったら、Stream Pool OCIDをコピーしてどこかに保存しておいてください。次に'View Kafka Connection Settings'ボタンをクリックします。
Bootstrap Serverの値をコピーします。これもあとで使います。
Stream Poolの詳細ページで、'Create Stream'をクリック。
Streamに名前をつけて、'Create Stream'をクリック。ここで付けた名前もあとで使います。

Connect Configurationの作成

次はサイドバーメニューの'Kafka Connect Configurations'をクリックし、'Create Kafka Connect Configuration'をクリックします。
名称を入力して、'Create Kafka Connect Configuration'をクリック。
Connect Configurationの詳細ページで、OCIDをコピーしてどこかに保存しておきます。

Object Storageの準備

最終的にメッセージが格納されることになるバケットを作成しておく必要があります。コンソールのハンバーガーメニューからObject Storageに行きましょう。
'Create Bucket'をクリックし、出てきたダイアログでバケットの名前を入力して作成しましょう。
これでKafka Connectの構成と起動の準備ができました。

Kafka Connectの構成と起動

ここまででKafka Connectを起動し、S3 Sink Connectorを作成してメッセージをObject StorageにPublishする準備ができました。ここではDevezium Connect Dockerイメージを使ってシンプルかつコンテナ化されたやり方を取りますが、公式のKafka Connect Dockerイメージを使ってもいいですし、バイナリ版を使ってもいいです。Dockerイメージを起動する前に、Connectを設定するためのプロパティファイルのセットアップが必要です。前述したステップで集めておいた情報が必要になるので手元に置いといてくださいね。また、Stream Poolから取ってきたStreamingのusername(SASL Connection Stringを参照)と認証トークンも必要です。
以下の内容で /projects/object-storage-demo/connect-distributed.properties のファイルを作成しましょう。<括弧>の部分は実際の値で置き換えてください。
bootstrap.servers=<bootstrap server from stream pool connection settings>
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<stream pool OCID>" password="<auth token>";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<stream pool OCID>" password="<auth token>";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<stream pool OCID>" password="<auth token>";
database.history.producer.sasl.mechanism=PLAIN
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<stream pool OCID>" password="<auth token>";
database.history.consumer.sasl.mechanism=PLAIN
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<stream pool OCID>" password="<auth token>";
retries=1
max.in.flight.requests.per.connection=1
config.storage.replication.factor=1
status.storage.replication.factor=1
offset.storage.replication.factor=1
config.storage.partitions=1
status.storage.partitions=1
offset.storage.partitions=1
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
task.shutdown.graceful.timeout.ms=10000
前のステップで保存しておいたConnect ConfigurationのOCIDをあなたのシェルに環境変数としてセットしましょう。
export CONFIG_ID=<connect config id>
view rawconfig_id.sh hosted with ❤ by GitHub
それではDockerイメージを以下のように起動します:
docker run -it --rm --name connect-demo -p 8083:8083 -e GROUP_ID=1                                                                                                                                                                                          
    -e BOOTSTRAP_SERVERS="cell-1.streaming.us-phoenix-1.oci.oraclecloud.com:9092" \
    -e CONFIG_STORAGE_TOPIC=$CONFIG_ID-config \
    -e OFFSET_STORAGE_TOPIC=$CONFIG_ID-offset \
    -e STATUS_STORAGE_TOPIC=$CONFIG_ID-status \
    -v $(pwd -L)/connect-distributed.properties:/kafka/config.orig/connect-distributed.properties \
    -v $(pwd -L)/confluentinc-kafka-connect-s3-5.3.2/:/kafka/connect/confluentinc-kafka-connect-s3-5.3.2 \
    -v $(pwd -L)/aws_credentials:/kafka/.aws/credentials \
    debezium/connect:latest
view rawdocker-run.sh hosted with ❤ by GitHub
Kafka Connectが起動したら、Connectorの設定を記述するJSONファイルを作りましょう。以下の内容で /projects/object-storage-demo/connector-config.json にファイルを作ってください。
{
"name": "oss-object-storage-demo",
"config": {
"name":"oss-object-storage-demo",
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"1",
"topics":"<your stream name>",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"flush.size":"1",
"s3.bucket.name":"<your object storage bucket name>",
"store.url":"https://<namespace (usually your tenancy name)>.compat.objectstorage.us-phoenix-1.oraclecloud.com",
"s3.region":"us-phoenix-1"
}
}
 topic と s3.bucket.name および store.url の値は適切に置き換えてください(URLの中のリージョン部分と、s3.regionの値も変更が必要かも)。Object Storageに生成されるファイルにひとつ以上のメッセー意jが書かれるようにしたいなら、 flush.size の値も適切に更新しましょう。他のカスタマイズについてはS3 Sinkのドキュメントを参照ください。
では、REST APIでこの設定を POST してSource Connectorを作成しましょう:
curl -iX POST -H "Accept:application/json" -H "Content-Type:application/json" -d @connector-config.json http://localhost:8083/connectors
view rawcreate-connector.sh hosted with ❤ by GitHub
すべてのConnectorの一覧の取得には GET リクエストしましょう。
curl -i http://localhost:8083/connectors
view rawlist-connectors.sh hosted with ❤ by GitHub
Connectorを削除したければ DELETE リクエストです。
curl -i -X DELETE http://localhost:8083/connectors/[connector-name]
view rawdelete-connector.sh hosted with ❤ by GitHub

インテグレーションを試してみる

ここまででインテグレーションを試せるようになっています。OCIコンソールのStreamの詳細ページに行って、'Produce Test Message'をクリックしてTopicにJSON文字列のメッセージをポストしてみましょう。
Connect Dockerコンソールでいくつかアクションが表示されるでしょう。
2019-12-20 17:49:58,664 INFO   ||  Starting commit and rotation for topic partition oss-demo-stream-0 with start offset {partition=0=0}   [io.confluent.connect.s3.TopicPartitionWriter]
2019-12-20 17:49:59,127 INFO   ||  Files committed to S3. Target commit offset for oss-demo-stream-0 is 1   [io.confluent.connect.s3.TopicPartitionWriter]
view rawconsole.sh hosted with ❤ by GitHub
バケットに移動してファイルが書き込まれているか見てみましょう。
"Pre-Authenticated Request"を作成すると、ファイルをダウンロードして内容を見られるようになります。

まとめ


このポストではStream PoolとStreamおよびConnect Configurationを作成、設定し、これらのアセットを使ってStreamからObject Storageへとメッセージをファイルとしてパブリッシュしました。

0 件のコメント:

コメントを投稿