[Streaming] KafkaワークロードのOracle Cloud Streamingへの移行/Migrate Your Kafka Workloads To Oracle Cloud Streaming

原文はこちら

あるホスティング環境から別の環境へのアプリケーションとワークロードの移行はともするとたいへんな道のりになります。過去15年のあいだ、わたしは何度もなんとか移行をしなければならないことがあり、それらは完全に平易なものでも楽しいものでもありませんでした。そこでは避けがたく予期していなかった問題が生じ、時には単純なアップグレードですら終わりなき頭痛を引き起こすこともありました。なので、Oracle Cloudへアプリケーションを移行しなければならない開発者、そしてDevOpsエンジニアのみなさんには同情します。同時に、アプリケーションをOracle Cloudへ移行することによって得られる利益はいつだって不利益を上回るとも考えています。このような移行の労苦については多くの検討が内部でなされており、数あるツール、また、プロセスをより簡単で苦労を少なくするためのちょっとした「ワザ」が開発されています。このポストではそうした「ワザ」のうちひとつについてお話します。

Kafkaは疑いなくデータストリーミング(とその他)でポピュラーで、それは性能がよく、信頼でき、利用を容易にするためのSDK実装がいくつもあるからです。あなたのアプリケーションもすでにKafkaを利用しているかもしれません。おそらくあなたはあるマイクロサービスでメッセージをProduceし、他のマイクロサービスでConsumeしているかも。ではなぜこのような目的のために、KafkaではなくOracle Streaming Service(OSS)を使うことを検討するべきなのでしょうか?わたしの経験上、ZookeeperとKafkaクラスターをホストするためのインフラを構築し、保守するのは大変な苦労(とコスト)を伴い、したがってあなたは深い知識と構成管理のための余分な時間が必要になります。その代わりに、すぐに使い出せるホステッド環境を提供するOSSのようなサービスを活用することで、そうした時間(とコストの一部)が不要になります。このブログポストでは、あなたのアプリケーションでKafka SDK for Javaを使うことでOSSをかんたんに使えるということをご説明していきます。このKafka互換機能は現状ではLimited Availabilityでの提供となっていますが、遠くないうちにGAとなる予定です。

Streamの構成

まず初めに、このデモのためにちゃちゃっとStreamトピックを構成しちゃいましょう。Oracle Cloudのダッシュボードコンソールから、'Analytics' -> 'Streaming'と選択します。
Streamのリストページで、'Create Stream'をクリック(必要に応じてこのStreamを配置するのに適切なコンパートメントを左のサイドバーから選択しておいてください)。
Create Streamダイアログで、Streamの名前をつけ、Retentionの値(あるメッセージがトピックから破棄されるまでの時間)を入力します。
希望のパーティション数を入力して、'Create Stream'をクリック。
新しいStreamの詳細ページに直接遷移し、おおよそ30秒でStreamの状態がActiveと表示されるでしょう。

Streams Userの作成

次にやるのはStreamingサービス専用のユーザーの作成です。コンソールサイドバーメニューからIdentity配下のUsersをクリック。
'Create User'をクリックして出てくるダイアログを埋めます。
新しいユーザーが作成されたら、ユーザー詳細ページに行ってAuth Tokenを生成します。
Tokenをコピーしてしっかり保存しておきましょう。このGenerate Tokenダイアログをいったん離れると取得することはできません。Identityコンソールにいるはずなので、サイドバーからCompartmentsをクリックしてStreamを作成したコンパートメントを見つけてください。そのコンパートメントのOCISをコピーし、これもあとで使うので保存しておいてください。
さて、ここからグループを作成し、先ほど作成したユーザーをこのグループに追加し、グループポリシーを作成します。
ここまでAuth Tokenを持ち、Streamを利用できるグループに所属したユーザーが用意できました。やったぜ。また、コンパートメントのOCIDも控えてあるので、コードに進んでいく準備ができています。

Kafka Producerの作成

ここまででKafka Client SDKを使ってこのStreamを利用するコードへと進む準備ができました。わたしたちのプロジェクトでの最初のステップはClient SDKの依存するライブラリが存在することの確認です。わたしはGradleを使っていますが、あなたが他のものを使っているのであれば適切に修正してください。
plugins {
    id 'java'
    id 'application'
}
group 'codes.recursive'
version '0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
    mavenCentral()
}
application {
    mainClassName = 'codes.recursive.KafkaProducerExample'
}
dependencies {
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}
tasks.withType(JavaExec) {
    systemProperties System.properties
}
view rawbuild.gradle hosted with ❤ by GitHub
ここでわたしはProducerをテストするために以下のような小さなJavaプログラムを使います。
package codes.recursive;
public class KafkaProducerExample {
public static void main(String... args) throws Exception {
System.out.println("producer");
CompatibleProducer producer = new CompatibleProducer();
producer.produce();
}
}
CompatibleProducer クラスをビルドする前に、事前に取得しておいた必要な認証情報をIDEのRun/Debug設定に入れ込んでおきます。
べつにひどく複雑ってわけではないんですが、でもそれぞれの要素がわかりやすくなるように、ここで一度にはCompatibleProducer クラスだけをビルドすることにしましょう。まず、認証情報のための変数をいくつか宣言し、アプリケーションに渡す環境変数から値をセットするようにしておきます。
public class CompatibleProducer {
    public void produce() {
        String authToken = System.getenv("AUTH_TOKEN");
        String tenancyName = System.getenv("TENANCY_NAME");
        String username = System.getenv("STREAMING_USERNAME");
        String compartmentId = System.getenv("COMPARTMENT_ID");
        String topicName = System.getenv("TOPIC_NAME");
     }
}
次に、 KafkaProducerを構成するために使ういくつかのプロパティを作成します。これらはKafka SDKを使ってOSSのStreamにアクセスするために必要になるプロパティです。
:Streamingトピックを作成したリージョンに合わせて、"bootstrap.servers"のリージョンの値を変更する必要がある場合があります。
Properties properties = new Properties();
properties.put("bootstrap.servers", "streaming.us-phoenix-1.oci.oraclecloud.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                + tenancyName + "/"
                + username + "/"
                + compartmentId + "\" "
                + "password=\""
                + authToken + "\";"
);
properties.put("retries", 5); // retries on transient errors and load balancing disconnection
properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
最後に、 KafkaProducer を構成してプロパティを入れ込み、5つの「test」メッセージをトピックにProduceしましょう。
KafkaProducer producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, UUID.randomUUID().toString(), "Test record #" + i);
    producer.send(record, (md, ex) -> {
        if( ex != null ) {
            ex.printStackTrace();
        }
        else {
            System.out.println(
                    "Sent msg to "
                            + md.partition()
                            + " with offset "
                            + md.offset()
                            + " at "
                            + md.timestamp()
            );
        }
    });
}
producer.flush();
producer.close();
System.out.println("produced 5 messages");
あなたのIDEでこのプログラムを実行すると、以下のような出力が得られるでしょう。
> Task :producer:run
producer
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Sent msg to 0 with offset 57 at 1570626362254
Sent msg to 0 with offset 58 at 1570626362254
Sent msg to 0 with offset 59 at 1570626362254
Sent msg to 0 with offset 60 at 1570626362254
Sent msg to 0 with offset 61 at 1570626362254
produced 5 messages
BUILD SUCCESSFUL in 8s
2 actionable tasks: 2 executed
09:06:01: Task execution finished 'run'.
Disconnected from the target VM, address: '127.0.0.1:59701', transport: 'socket'
view rawoutput.sh hosted with ❤ by GitHub
ここでStreamingコンソールをちょっと見て、5つのメッセージがProduceされて表示されていることを確認してみましょう。
次に、Kafka互換のConsumerを作成してこれらのProduceされたメッセージをConsumeします。

Kafka Consumerの作成

Consumer作成のステップのうち大部分はProducer作成のためのステップと同様です(Gradleの依存ライブラリ、Run設定の環境変数、など)。なのでここでは CompatibleConsumer クラス自体にフォーカスしたいと思います。何かしら見逃しがあるんじゃないかと心配でも逃げ出さないでくださいね、このブログポストのコード全部がGitHubで見られます。わたしたちの互換Consumerを作成していきましょう!
The consumer starts out similarly - declaring the credentials, setting some properties (which do differ slightly from the producer, so beware!) and creating the Consumer itself:
Consumerは同じように作っていきます。認証情報を宣言し、プロパティを設定(Producer用のとはちょっと異なるので注意してくださいね!)し、 Consumer 自体を作成します。
:Procuderのときと同様、Streamingトピックを作成したリージョンに合わせて、"bootstrap.servers"のリージョンの値を変更する必要がある場合があります。
String authToken = System.getenv("AUTH_TOKEN");
String tenancyName = System.getenv("TENANCY_NAME");
String username = System.getenv("STREAMING_USERNAME");
String compartmentId = System.getenv("COMPARTMENT_ID");
String topicName = System.getenv("TOPIC_NAME");
Properties properties = new Properties();
properties.put("bootstrap.servers", "streaming.us-phoenix-1.oci.oraclecloud.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-0");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put("sasl.jaas.config",
        "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                + tenancyName + "/"
                + username + "/"
                + compartmentId + "\" "
                + "password=\""
                + authToken + "\";"
);
properties.put("max.partition.fetch.bytes", 1024 * 1024); // limit request size to 1MB per partition
Consumer<Long, String> consumer = new KafkaConsumer<>(properties);
ここで、作成しておいたトピックへのSubscriptionを構成し、1秒毎に新規メッセージをポーリングさせます。
try {
    consumer.subscribe(Collections.singletonList( topicName ) );
    while(true) {
        Duration duration = Duration.ofMillis(1000);
        ConsumerRecords<Long, String> consumerRecords = consumer.poll(duration);
        consumerRecords.forEach(record -> {
            System.out.println("Record Key " + record.key());
            System.out.println("Record value " + record.value());
            System.out.println("Record partition " + record.partition());
            System.out.println("Record offset " + record.offset());
        });
        // commits the offset of record to broker.
        consumer.commitAsync();
    }
}
catch(WakeupException e) {
    // do nothing, shutting down...
}
finally {
    System.out.println("closing consumer");
    consumer.close();
}
最後のステップはこのConsumerサンプルを実行した上でProducerに火を入れて、このConsumerが新しくProduceされたメッセージをConsumeするのを観察することです。
> Task :consumer:run
consumer
org.apache.kafka.common.security.plain.PlainLoginModule required username="toddrsharp/streaming-user/ocid1.compartment.oc1..aaaaaaaa7lzppsdxt6j56zhpvy6u5gyrenwyc2e2h4fak5ydvv6kt7anizbq" password="M0}tf)R<eCumKcgic6mC";
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Record Key 92a078df-89bd-4c55-a40b-668509f5d543
Record value Test record #0
Record partition 0
Record offset 62
Record Key ba12e804-49b0-49cc-ac6f-c45dc52bf543
Record value Test record #1
Record partition 0
Record offset 63
Record Key 29883f60-5fb1-4c94-a23f-5a489f109d82
Record value Test record #2
Record partition 0
Record offset 64
Record Key 00af0e12-a92f-4162-b2a6-22cdcf04fd73
Record value Test record #3
Record partition 0
Record offset 65
Record Key dd69776a-1bfd-419a-b6b0-7445c098b20e
Record value Test record #4
Record partition 0
Record offset 66
<=========----> 75% EXECUTING [35s]
view rawconsumer_output.sh hosted with ❤ by GitHub
はい、というわけで、あなたはKafka SDK for Javaを使ってOracle Streaming ServiceトピックとメッセージをProduce、Consumeできました。
Hey! このブログポストのコードはGitHubで見られるます: https://github.com/recursivecodes/oss-kafka-compatible-streaming