https://community.oracle.com/docs/DOC-1006738
リアクティブ・プログラミング(Reactive Programming)は非同期データ・アイテムのストリームを処理するものです。アプリケーションはデータ・アイテムが発生すると、そのデータ・アイテムに反応します。Rahul Srivastavaによるこの記事で、JDK 9 Flow APIを使った例をご紹介します。
What is Reactive Programming ?
Reactive Programmingとは、非同期データ項目のストリームを処理するものです。ここで、アプリケーションはデータ項目が発生するとそのデータに対応します。データストリームは基本的に時系列で発生するデータ項目の並び(シーケンス)です。このモデルは、データをストリームとして処理するため、インメモリデータの反復処理と比較するとメモリ効率がよいものです。Reactive Programmingモデルでは、Publisher(パブリッシャ)とSubscriber(サブスクライバ)が存在します。Publisherはデータストリームを発行(publish)し、Subscriberは非同期で当該データストリームを購読(subscribe)します。
このモデルは、Processorを使ってストリーム上で動作するより高い高階関数を提供するメカニズムも提供します。Processorはデータストリームを変換します。この際、PublisherやSubscriberを変更する必要はありません。Processor (もしくはProcessorのチェーン)はPublisherとSubscriberの間に位置し、データストリームを次々変換します。PublisherとSubscriberはデータストリームに発生する変換とは無関係です。

Why Reactive Programming ?
- よりシンプルなコードで、より可読性が高い
- ボイラープレートを取り除き、ビジネスロジックに集中できる
- 低レベルのスレッド、同期、並行性の問題を取り除くことができる
- ストリーム処理はメモリ効率がよい
- このモデルを適用すると、ほとんど全ての種類の問題を解決することができる
JDK 9 Flow API
JDK 9のFlow APIはデファクトスタンダードのReactive Streams Specificationに対応します。Reactive Streams SpecificationはReactive Programmingを標準化するイニシアティブの一つです。Reactive Streams Specification既にいくつかの実装がReactive Streams Specificationをサポートしています。
http://www.reactive-streams.org/
Reactive Streams 1.0.0 is here!
http://www.reactive-streams.org/announce-1.0.0

Flow API Interfaces
Flow API (とReactive Streams API)は、ある点では、Iteretor(イテレータ)やObserver(オブザーバ)パターンからのアイデアの組み合わせです。Iteretorは、アプリケーションがソースからアイテムを引っ張るというプル・モデルです。Observerは、ソースからの項目をアプリケーションにプッシュするプッシュ・モデルです。Flow APIを使うと、アプリケーションは最初にN個の項目を要求し、その後Publisherは最大でN
@FunctionalInterface
public
static
interface
Flow.Publisher<T> {
public
void
subscribe(Flow.Subscriber<?
super
T> subscriber);
}
public
static
interface
Flow.Subscriber<T> {
public
void
onSubscribe(Flow.Subscription subscription);
public
void
onNext(T item) ;
public
void
onError(Throwable throwable) ;
public
void
onComplete() ;
}
public
static
interface
Flow.Subscription {
public
void
request(
long
n);
public
void
cancel() ;
}
public
static
interface
Flow.Processor<T,R>
extends
Flow.Subscriber<T>, Flow.Publisher<R> {
}
個の項目をSubscriberにプッシュします。それゆえ、PullプログラミングモデルとPushプログラミングモデルの組合せなのです。
The Subscriber
SubscriberはコールバックのためにPublisherを購読します。データ項目はリクエストがない限りSubscriberにプッシュされませんが、複数の項目がリクエストされる可能性があります。指定されたSubscriptionに対するSubscriberのメソッド呼び出しは厳密に順序付けされます。アプリケーションは、Subscriberが利用可能な、以下のコールバックに反応することができます。コールバック(Callback) | 説明 |
---|---|
onSubscribe | 指定されたSubscriptionのための、任意の他のSubscriberメソッドを呼び出す前に呼び出されるメソッド |
onNext | Subscriptionの次の項目を伴って呼び出されるメソッド |
onError | PublisherもしくはSubscriptionによってリカバリできないエラーが発生したときに呼び出されるメソッド。その後、Subscriptionは他のSubscriberメソッドを呼び出さない。 PublisherがSubscriberにデータ項目を発行できないというエラーに遭遇した場合、SubscriberはonErrorを受け取り、その後それ以上のメッセージを受信しない。 |
onComplete | エラーによって未終了のSubscriptionのために追加のSubscriberメソッド呼び出しが発生しないことがわかっている場合に呼び出されるメソッド。このあと、別のSubscriberメソッドはSubscriptionから呼び出されることはない。 後続のメッセージがSubscriberに発行されないことがわかっている場合、SubscriberはonCompleteを受け取る。 |
Subscriberのサンプル
import
java.util.concurrent.Flow.*;
...
public
class
MySubscriber<T>
implements
Subscriber<T> {
private
Subscription subscription;
@Override
public
void
onSubscribe(Subscription subscription) {
this
.subscription = subscription;
subscription.request(
1
);
//a value of Long.MAX_VALUE may be considered as effectively unbounded
}
@Override
public
void
onNext(T item) {
System.out.println(
"Got : "
+ item);
subscription.request(
1
);
//a value of Long.MAX_VALUE may be considered as effectively unbounded
}
@Override
public
void
onError(Throwable t) {
t.printStackTrace();
}
@Override
public
void
onComplete() {
System.out.println(
"Done"
);
}
}
The Publisher
Publisherはデータ項目のストリームを登録されたSubscriberに発行(publish)します。Publisherは通常はExecutorを使って、データ項目をSubscriberに非同期で発行します。Publisherは各SubscriptionのSubscriberのメソッド呼び出しの厳密な順序付けを保証します。JDKのSubmissionPublisherを使ってデータ項目のストリームをSubscriberに発行する例
import
java.util.concurrent.SubmissionPublisher;
...
//Create Publisher
SubmissionPublisher<String> publisher =
new
SubmissionPublisher<>();
//Register Subscriber
MySubscriber<String> subscriber =
new
MySubscriber<>();
publisher.subscribe(subscriber);
//Publish items
System.out.println(
"Publishing Items..."
);
String[] items={
"1"
,
"x"
,
"2"
,
"x"
,
"3"
,
"x"
};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
The Subscription
Flow.PublisherとFlow.Subscriberをリンクします。Subscriberは要求された場合のみデータ項目を受信します。Subscriptionを介していつでもキャンセルすることができます。メソッド(Method) | 説明 |
---|---|
request | このsubscriptionに対する現在の満たされていない要求に対して、指定されたn件の項目を追加する |
cancel | (最終的に)Subscriberにメッセージの受信を停止させる |
The Processor
SubscriberとPublisherの両方として機能するコンポーネントです。Processorは、PublisherとSubscriberの間に位置し、ストリームを次々と変換します。一つ以上のProcessorが互いに繋がることができます。チェーンの最後のProcessorの結果がSubscriberによって処理されます。JDKは具体的なProcessorを提供しませんので、必要となる任意のProcessorの実装は個々人に任されています。StringをIntegerに変換するProcessorのサンプル
Processorを使ってデータストリームを変換するコードのサンプル
import
java.util.concurrent.Flow.*;
import
java.util.concurrent.SubmissionPublisher;
...
public
class
MyTransformProcessor<T,R>
extends
SubmissionPublisher<R>
implements
Processor<T, R> {
private
Function function;
private
Subscription subscription;
public
MyTransformProcessor(Function<?
super
T, ?
extends
R> function) {
super
();
this
.function = function;
}
@Override
public
void
onSubscribe(Subscription subscription) {
this
.subscription = subscription;
subscription.request(
1
);
}
@Override
public
void
onNext(T item) {
submit((R) function.apply(item));
subscription.request(
1
);
}
@Override
public
void
onError(Throwable t) {
t.printStackTrace();
}
@Override
public
void
onComplete() {
close();
}
}
import
java.util.concurrent.SubmissionPublisher;
...
//Create Publisher
SubmissionPublisher<String> publisher =
new
SubmissionPublisher<>();
//Create Processor and Subscriber
MyFilterProcessor<String, String> filterProcessor =
new
MyFilterProcessor<>(s -> s.equals(
"x"
));
MyTransformProcessor<String, Integer> transformProcessor =
new
MyTransformProcessor<>(s -> Integer.parseInt(s));
MySubscriber<Integer> subscriber =
new
MySubscriber<>();
//Chain Processor and Subscriber
publisher.subscribe(filterProcessor);
filterProcessor.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
System.out.println(
"Publishing Items..."
);
String[] items = {
"1"
,
"x"
,
"2"
,
"x"
,
"3"
,
"x"
};
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
Back pressure
Subscriberが消費するペースよりもはるかに速いペースでPublisherがデータ項目を生成している場合に、Back Pressure(背圧)がかかります。Back-Pressure未処理のデータ項目がバッファリングされているバッファサイズが制限されることがあります。Flow APIは、Back Pressureを合図したり、対処したりするようなAPIを提供しませんが、Back Pressureを処理する実装する上では様々な戦略が考えられます。RxJavaでのBack Pressureの処理方法を確認してください。
http://www.reactivemanifesto.org/glossary#Back-Pressure
Summary
Reactive Programming APIがJDK 9に加わるのは幸先がよいことです。他の多くの製品は製品機能にアクセスするためのReactive Programming APIも提供しています。Flow APIを使うと、プログラマーはリアクティブなプログラムを書き始めることができますが、エコシステムがもっと進化する必要があります。例えば、リアクティブ・プログラムであってもやはり最終的には、これまでのAPIを使ってデータベースにアクセスする可能性があります。というのも、全てのデータベースがReactive ProgrammingのAPIをサポートしているわけではないからです。つまり、リアクティブ・プログラムが依存するAPIが、リアクティブ・プログラミング・モデルをまだサポートしていない可能性がある、ということです。
References
- Download JDK 9
https://jdk9.java.net/ - JDK 9 Flow API javadoc
http://download.java.net/java/jdk9/docs/api/index.html?java/util/concurrent/Flow.html - Reactive Streams Specification
http://www.reactive-streams.org/ - Reactive Streams API javadoc
http://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/ - The Reactive Manifesto
http://www.reactivemanifesto.org/
0 件のコメント:
コメントを投稿