[Streaming] Oracle Streaming ServiceとAutonomous DBでKafka Connectを使ってみる/Using Kafka Connect With Oracle Streaming Service And Autonomous DB

原文はこちら

メッセージのConsumeとProduceにいまお使いなのがMicronautのようなフレームワークであれ、Kafka SDKそのものであれ、あなたのアプリケーションの中でメッセージングを扱うのに、Oracle Streaming Service(OSS)はかんたんでより安価な方法です。Kafkaクラスターを手で組み上げてそのコストと保守作業に気をもむ…といったことが必要なくなります。ただStreamを作成して、あとはそれを使ってメッセージをProduce、Consumeするだけでよいのです。
とはいえ、それだけでは足りない場合もありますよね。マイクロサービスアーキテクチャではメッセージングは実に重要で、今まではOracle Streaming Serviceではそのうち輸送の部分、ProduceとConsumeのみを扱っていました。つまり、やり取りの受信元と送信先は両方ともアプリケーション側の責務の範囲でした。しかしそれも過去の話、というのもOracle Streaming ServiceでKafka Connectを使えるようになったのです。
Kafka Connectがなにかって?いい質問ですね!Kafka Connectは外部ソースをKafkaに(あるいは、わたしたちの場合はOracle Streaming Serviceに)つなげるためのオープンソースフレームワークです。外部ソースとはたとえばオブジェクトストレージやデータベース、Key-Valueストアなどがあたります。Kafka Connectについて知っておいたほうがいい用語がふたつあります。ひとつはSource Connector、もうひとつがSink Connectorです。Source Connectorを使うと外部ソースからデータを取ってくることができ、Sink Connectorでは外部ソースにデータを送ることができます。
実際イケてるフレームワークですよこれは。考えてみてください、データベーステーブルにレコードがインサートされるたびにメッセージを受け取るStreamを作ることもできるんですよ!TopicにメッセージをProduceするだけでレコードをテーブルにインサートすることだってできる!でもお喋りはここまでにして、実際こういうことをどうやって実現するのか見ていきましょう。ぜんぜん難しくありません、一通りステップバイステップでやっていきましょう。

Kafka Connectのインテグレーションの準備

このチュートリアルでは、Autonomous Transaction Processing(ATP)インスタンス上のテーブルからデータを取ってくるSource Connectorを作成します。ただ、このインテグレーション編に進む前にいくつか準備が必要です。あなたのマシンに諸々のファイルを保存する用のプロジェクトディレクトリを作っておくのがよいでしょう。そのディレクトリをここでは /projects/connect-demo として参照しますが、必要に応じてご自身のパスに読み替えてください。

Autonomous DBのセットアップ

インテグレーションに使ってみる用のAutonomous DBを作成し、Walletの認証情報を取得していきましょう。インスタンスをお持ちでない場合は、こちらのガイドを参照してAutonomous DBをサクッと作成して起動しましょう。お望みであればalways freeのATPインスタンスを使っていただいてもちゃんとできますよ。
用意できたらまず、その起動中のインスタンスにSQL Developerでつなげて(またはSQL Developer Webを使ってもOK)、いくつか権限を付与しつつ新しいユーザーを作成しましょう:
CREATE USER connectdemo IDENTIFIED BY "Str0ngP@$$word1234";
GRANT CONNECT, RESOURCE TO connectdemo;
GRANT UNLIMITED TABLESPACE TO connectdemo;
view rawcreate-user.sql hosted with ❤ by GitHub
次に、ちょっとしたテーブルを作成します。
CREATE TABLE TEST 
(
  ID NUMBER(10,0) GENERATED BY DEFAULT ON NULL AS IDENTITY,
  USERNAME VARCHAR2(50) NOT NULL,
  FIRST_NAME VARCHAR2(50) NOT NULL,
  MIDDLE_NAME VARCHAR2(50),
  LAST_NAME VARCHAR2(50) NOT NULL,
  AGE NUMBER(5,0) DEFAULT 0 NOT NULL,
  CREATED_ON TIMESTAMP(9) NOT NULL,
  CONSTRAINT TEST_PK PRIMARY KEY 
  (
    ID 
  )
  ENABLE 
);
view rawcreate-table.sql hosted with ❤ by GitHub
これで準備はOKです!次に進みましょう。

必要なモジュールのダウンロード

プロジェクトディレクトリに3つのものをダウンロードする必要があります:
  1. Oracle JDBC Drivers
  2. Kafka JDBC Connector
  3. あなたのATPインスタンスのWallet
OracleドライバのZIPの中身を /projects/connect-demo/drivers に配置して、Kafka JDBC ConnectorのZIPの中身は /projects/connect-demo/kafka-jdbc/connector に配置してください。次に、Walletを /projects/connect-demo/wallet に配置しましょう。WallteはコンソールUIかOCI CLI経由でダウンロードできます。CLIでサクッとやる場合には:
# list your Autonomous instances
oci db autonomous-database list
# find your instance in the list and get the OCID
# substitute the OCID below and run:
oci db autonomous-database generate-wallet \
--autonomous-database-id \
ocid1.autonomousdatabase.oc1.phx... \
--file /projects/connect-demo/wallet/wallet.zip \
--password Str0ngP@$$word
view rawdownload-wallet.sh hosted with ❤ by GitHub
注意:WalletのZIPを解凍するのをお忘れなく。中身は /projects/connect-demo/wallet に配置してくださいね。
OK、これでダウンロードが必要なものについては揃いました。ではではStreamへと飛び込んでいきましょう!

Stream Poolの作成

次に作成が必要な部品がふたつあります。Stream Poolと、Connect Configurationです。これらを作成するのにOCI SDKを使ってお好みの通りにコードでやることもできますが、ここではより簡潔にコンソールUIでやっちゃいましょう。まずはコンソールのハンバーガーメニューから'Analytics' -> 'Streaming'とクリックしてStreamingのページに向かいましょう:
次に、Streaming初期ページの左側のメニューから、'Stream Pools'を選択。
そして'Create Stream Pool'をクリック。
適当な名前を入力して'Auto Create Topics'にチェックを入れましょう。これによりKafka Connectが必要に応じてTopicを作成できるようになるんですが、これはKafkaの 'auto.create.topics.enable' の設定と同等です。
"Create Stream Pool'をクリックしてちょっと待てばPoolが'Active'になります。そうしたら'View Kafka Connection Settings'ボタンをクリックしましょう。
ここで出てくる情報はあとで使うためどこかにコピーしておいてください。なお、SASL Connection Stringのusernameは別のものを使うこともできます(詳細はこのポストのCreate a Streams Userを読んで下さい)。
Poolができあがりましたので、Connect Configurationを料理していきましょう。

Connect Configurationの作成

次はサイドバーから'Kafka Connect Configuration'をクリックしてCreate~~のボタンをクリック。
出てくるダイアログでは適当な名前を入力してください。
作成されたら、表示されるConnect Configuration OCIDとKafka Connect Storage Topicsをどこかにコピーしておいてください。
これでOK、次にいきましょう。

Kafka Connectの構成と起動

ここまででKafka Connectを起動し、Source Connectorを作成して TEST テーブルをリッスンさせるための用意ができました。ここではDevezium Connect Dockerイメージを使ってシンプルかつコンテナ化されたやり方を取りますが、公式のKafka Connect Dockerイメージを使ってもいいですし、バイナリ版を使ってもいいです。Dockerイメージを起動する前に、Connectを設定するためのプロパティファイルのセットアップが必要です。前述したステップで集めておいた情報が必要になるので手元に置いといてくださいね。また、Stream Poolから取ってきたStreamingのusername(SASL Connection Stringを参照)と認証トークンも必要です。Streaming専用ユーザーと認証トークンの生成方法についてはこちらのブログポストを参照ください。
以下の内容で /projects/connect-demo/connect-distributed.properties のファイルを作成してください。<括弧>の部分はご自身の実際の値で置き換えてください。
group.id=connect-demo-group
bootstrap.servers=<streamPoolBootstrapServer>
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<username>/<streamPoolId>" password="<authToken>";
config.storage.replication.factor=1
config.storage.partitions=1
config.storage.topic=<connectConfigOCID>-config
status.storage.replication.factor=1
status.storage.partitions=1
status.storage.topic=<connectConfigOCID>-status
offset.storage.replication.factor=1
offset.storage.partitions=1
offset.storage.topic=<connectConfigOCID>-offset
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
task.shutdown.graceful.timeout.ms=10000
コンテナに依存モジュールを収めないとならないので、以下のような debezium/connect イメージベースの /projects/connect-demo/Dockerfile を作成してください。
FROM debezium/connect:0.10
USER root
RUN mkdir /wallet
USER kafka
COPY driver/* /kafka/libs/
COPY kafka-connect-jdbc/lib/* /kafka/libs/
COPY wallet/* /wallet/
view rawDockerfile hosted with ❤ by GitHub
ではこのDockerイメージをビルドしていきます。私はTopic名の置換を簡単にするためのBashスクリプトを作ってみましたが、<括弧>の内容を置き換えつつここにあるコマンドをそれぞれ手動で実行して頂いても構いません。なお、先程作成した /projects/connect-demo/connect-distributed.properties ファイルをDockerコンテナにマウントしている点に留意ください。
#!/usr/bin/env bash
CONNECT_CONFIG_ID=<connectConfigOCID>
CONFIG_STORAGE_TOPIC=$CONNECT_CONFIG_ID-config
OFFSET_STORAGE_TOPIC=$CONNECT_CONFIG_ID-offset
STATUS_STORAGE_TOPIC=$CONNECT_CONFIG_ID-status
docker build -t connect .
docker run -it --rm --name connect -p 8083:8083 \
-e GROUP_ID=connect-demo-group \
-e BOOTSTRAP_SERVERS=<streamPoolBootstrapServer> \
-e CONFIG_STORAGE_TOPIC=$CONFIG_STORAGE_TOPIC \
-e OFFSET_STORAGE_TOPIC=$OFFSET_STORAGE_TOPIC \
-e STATUS_STORAGE_TOPIC=$STATUS_STORAGE_TOPIC \
-v `pwd -P`/connect-distributed.properties:/kafka/config.orig/connect-distributed.properties \
connect
view rawconnect-demo.sh hosted with ❤ by GitHub
このBashスクリプトを実行することでConnectインスタンスを起動できます。マシンやネットワークにもよりますが、だいたい30~45秒くらいで起動されます。起動したら、REST APIでConnectorを作成できるようになっていますが、その前にJSONのConnector設定ファイルが必要です。
{
"name": "oss-atp-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "100",
"connection.url": "jdbc:oracle:thin:@demodb_high?TNS_ADMIN=/wallet",
"connection.user": "coannectdemo",
"connection.password": "Str0ngP@$$word1234",
"mode": "incrementing",
"incrementing.column.name": "ID",
"topic.prefix": "demo-stream-",
"table.whitelist": "TEST",
"numeric.mapping": "best_fit"
}
}
留意してほしいことがいくつかあります。ここのConnection URLはもし過去にATPやJDBCを使ったことがあればおなじみのものでしょう。これはあなたのWallet内のtnsnames.oraファイルの選択したエントリーを参照しており、また、Walletへのパスも指定しています(Dockerコンテナ内でのパスで、これは前のステップでファイルシステムのルートに置いてありました)。UserとPasswordは前のステップで作成しておいたスキーマの認証情報です。 topic.prefix のエントリは table.whitelist 内に指定したテーブルでTopicのPrefixとして使われます。
では、REST APIでこの設定を POST してSource Connectorを作成しましょう:
curl -iX POST -H "Accept:application/json" -H "Content-Type:application/json" -d @connector-config.json http://localhost:8083/connectors
view rawcreate-connector.sh hosted with ❤ by GitHub
すべてのConnectorの一覧の取得には GET リクエストしましょう。
curl -i http://localhost:8083/connectors
view rawlist-connectors.sh hosted with ❤ by GitHub
Connectorを削除したければ DELETE リクエストです。
curl -i -X DELETE http://localhost:8083/connectors/[connector-name]
view rawdelete-connector.sh hosted with ❤ by GitHub
他の操作についてはConnect REST APIドキュメントを参照してください。
Connectorが作成できたら、Whitelistに入れたテーブルにそれぞれTopicが作成され、指定したTopic Prefixとテーブル名から成る名前がつけられて利用可能になります。

インテグレーションを試してみる

Streamが利用可能になったら、テーブルにいくつかのレコードをインサートしてトランザクションをコミットしてみましょう:
INSERT INTO TEST (username, first_name, middle_name, last_name, age, created_on) 
VALUES ('todd', 'Todd', null, 'Sharp', 42, sysdate);
INSERT INTO TEST (username, first_name, middle_name, last_name, age, created_on) 
VALUES ('gvenzl', 'Gerald', null, 'Venzl', 30, sysdate);
INSERT INTO TEST (username, first_name, middle_name, last_name, age, created_on) 
VALUES ('aalmiray', 'Andres', null, 'Almiray', 40, sysdate);
view rawinsert.sql hosted with ❤ by GitHub
コンソールで当該のStreamをクリックして、'Load Messages'をクリックすると最近のメッセージが表示されます。 TEST テーブルにインサートしたレコードごとにメッセージがあることが確認できるでしょう。
メッセージの値をクリックすると詳細が表示されます。
これでおしまい!

まとめ

このポストではATPにテストスキーマとテーブルを作成し、Stream PoolとConnect Configurationを作成し、Kafka ConnectのインスタンスをDebezium Dockerイメージを使って起動して、ATP用のSource ConnectorをKafka Connect上に作成しました。テーブルにレコードをインサートすると、そのレコードがStreamにメッセージとしてPublishされているのを見て取れました。
Kafka Connectによるインテグレーションはとてもパワフルで、Oracle Cloud上のどんなマイクロサービスにでも使うことができるでしょう。

0 件のコメント:

コメントを投稿