原文はこちら。
https://community.oracle.com/docs/DOC-1006738
リアクティブ・プログラミング(Reactive Programming)は非同期データ・アイテムのストリームを処理するものです。アプリケーションはデータ・アイテムが発生すると、そのデータ・アイテムに反応します。Rahul Srivastavaによるこの記事で、JDK 9 Flow APIを使った例をご紹介します。
What is Reactive Programming ?
Reactive Programmingとは、非同期データ項目のストリームを処理するものです。ここで、アプリケーションはデータ項目が発生するとそのデータに対応します。データストリームは基本的に時系列で発生するデータ項目の並び(シーケンス)です。このモデルは、データをストリームとして処理するため、インメモリデータの反復処理と比較するとメモリ効率がよいものです。
Reactive Programmingモデルでは、Publisher(パブリッシャ)とSubscriber(サブスクライバ)が存在します。Publisherはデータストリームを発行(publish)し、Subscriberは非同期で当該データストリームを購読(subscribe)します。
このモデルは、Processorを使ってストリーム上で動作するより高い高階関数を提供するメカニズムも提供します。Processorはデータストリームを変換します。この際、PublisherやSubscriberを変更する必要はありません。Processor (もしくはProcessorのチェーン)はPublisherとSubscriberの間に位置し、データストリームを次々変換します。PublisherとSubscriberはデータストリームに発生する変換とは無関係です。
Why Reactive Programming ?
- よりシンプルなコードで、より可読性が高い
- ボイラープレートを取り除き、ビジネスロジックに集中できる
- 低レベルのスレッド、同期、並行性の問題を取り除くことができる
- ストリーム処理はメモリ効率がよい
- このモデルを適用すると、ほとんど全ての種類の問題を解決することができる
JDK 9 Flow API
JDK 9のFlow APIはデファクトスタンダードのReactive Streams Specificationに対応します。Reactive Streams SpecificationはReactive Programmingを標準化するイニシアティブの一つです。
Reactive Streams Specification
http://www.reactive-streams.org/
既にいくつかの実装がReactive Streams Specificationをサポートしています。
Reactive Streams 1.0.0 is here!
http://www.reactive-streams.org/announce-1.0.0
Flow API Interfaces
@FunctionalInterface
public static interface Flow.Publisher<T> {
public void subscribe(Flow.Subscriber<? super T> subscriber);
}
public static interface Flow.Subscriber<T> {
public void onSubscribe(Flow.Subscription subscription);
public void onNext(T item) ;
public void onError(Throwable throwable) ;
public void onComplete() ;
}
public static interface Flow.Subscription {
public void request(long n);
public void cancel() ;
}
public static interface Flow.Processor<T,R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
}
Flow API (とReactive Streams API)は、ある点では、Iteretor(イテレータ)やObserver(オブザーバ)パターンからのアイデアの組み合わせです。Iteretorは、アプリケーションがソースからアイテムを引っ張るというプル・モデルです。Observerは、ソースからの項目をアプリケーションにプッシュするプッシュ・モデルです。Flow APIを使うと、アプリケーションは最初にN個の項目を要求し、その後Publisherは最大でN
個の項目をSubscriberにプッシュします。それゆえ、PullプログラミングモデルとPushプログラミングモデルの組合せなのです。
The Subscriber
SubscriberはコールバックのためにPublisherを購読します。データ項目はリクエストがない限りSubscriberにプッシュされませんが、複数の項目がリクエストされる可能性があります。指定されたSubscriptionに対するSubscriberのメソッド呼び出しは厳密に順序付けされます。アプリケーションは、Subscriberが利用可能な、以下のコールバックに反応することができます。
コールバック(Callback) | 説明 |
onSubscribe | 指定されたSubscriptionのための、任意の他のSubscriberメソッドを呼び出す前に呼び出されるメソッド |
onNext | Subscriptionの次の項目を伴って呼び出されるメソッド |
onError | PublisherもしくはSubscriptionによってリカバリできないエラーが発生したときに呼び出されるメソッド。その後、Subscriptionは他のSubscriberメソッドを呼び出さない。
PublisherがSubscriberにデータ項目を発行できないというエラーに遭遇した場合、SubscriberはonErrorを受け取り、その後それ以上のメッセージを受信しない。 |
onComplete | エラーによって未終了のSubscriptionのために追加のSubscriberメソッド呼び出しが発生しないことがわかっている場合に呼び出されるメソッド。このあと、別のSubscriberメソッドはSubscriptionから呼び出されることはない。
後続のメッセージがSubscriberに発行されないことがわかっている場合、SubscriberはonCompleteを受け取る。 |
Subscriberのサンプル
import java.util.concurrent.Flow.*;
...
public class MySubscriber<T> implements Subscriber<T> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
The Publisher
Publisherはデータ項目のストリームを登録されたSubscriberに発行(publish)します。Publisherは通常はExecutorを使って、データ項目をSubscriberに非同期で発行します。Publisherは各SubscriptionのSubscriberのメソッド呼び出しの厳密な順序付けを保証します。
JDKのSubmissionPublisherを使ってデータ項目のストリームをSubscriberに発行する例
import java.util.concurrent.SubmissionPublisher;
...
//Create Publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
MySubscriber<String> subscriber = new MySubscriber<>();
publisher.subscribe(subscriber);
//Publish items
System.out.println("Publishing Items...");
String[] items={"1", "x", "2", "x", "3", "x"};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
The Subscription
Flow.PublisherとFlow.Subscriberをリンクします。Subscriberは要求された場合のみデータ項目を受信します。Subscriptionを介していつでもキャンセルすることができます。
メソッド(Method) | 説明 |
request | このsubscriptionに対する現在の満たされていない要求に対して、指定されたn件の項目を追加する |
cancel | (最終的に)Subscriberにメッセージの受信を停止させる |
The Processor
SubscriberとPublisherの両方として機能するコンポーネントです。Processorは、PublisherとSubscriberの間に位置し、ストリームを次々と変換します。一つ以上のProcessorが互いに繋がることができます。チェーンの最後のProcessorの結果がSubscriberによって処理されます。JDKは具体的なProcessorを提供しませんので、必要となる任意のProcessorの実装は個々人に任されています。
StringをIntegerに変換するProcessorのサンプル
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
...
public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> {
private Function function;
private Subscription subscription;
public MyTransformProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit((R) function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}
Processorを使ってデータストリームを変換するコードのサンプル
import java.util.concurrent.SubmissionPublisher;
...
//Create Publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Create Processor and Subscriber
MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
MyTransformProcessor<String, Integer> transformProcessor = new MyTransformProcessor<>(s -> Integer.parseInt(s));
MySubscriber<Integer> subscriber = new MySubscriber<>();
//Chain Processor and Subscriber
publisher.subscribe(filterProcessor);
filterProcessor.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
System.out.println("Publishing Items...");
String[] items = {"1", "x", "2", "x", "3", "x"};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
Back pressure
Subscriberが消費するペースよりもはるかに速いペースでPublisherがデータ項目を生成している場合に、Back Pressure(背圧)がかかります。
Back-Pressure
http://www.reactivemanifesto.org/glossary#Back-Pressure
未処理のデータ項目がバッファリングされているバッファサイズが制限されることがあります。Flow APIは、Back Pressureを合図したり、対処したりするようなAPIを提供しませんが、Back Pressureを処理する実装する上では様々な戦略が考えられます。RxJavaでのBack Pressureの処理方法を確認してください。
Summary
Reactive Programming APIがJDK 9に加わるのは幸先がよいことです。他の多くの製品は製品機能にアクセスするためのReactive Programming APIも提供しています。Flow APIを使うと、プログラマーはリアクティブなプログラムを書き始めることができますが、エコシステムがもっと進化する必要があります。
例えば、リアクティブ・プログラムであってもやはり最終的には、これまでのAPIを使ってデータベースにアクセスする可能性があります。というのも、全てのデータベースがReactive ProgrammingのAPIをサポートしているわけではないからです。つまり、リアクティブ・プログラムが依存するAPIが、リアクティブ・プログラミング・モデルをまだサポートしていない可能性がある、ということです。
References
About the Author
Rahul Srivastavaは以前ApacheでXerces2-Jプロジェクトのコミッタをつとめていました。現在はOracleのApplication Server開発チームの技術スタッフの主要メンバーです。