対象読者
- Java経験者(初心者可)
- RxJava未経験者
- リアクティブプログラミング未経験者
※ ただし、前回までの連載を読んでいる前提です。
ParallelFlowableについて
ParallelFlowableはバージョン2.0.5より新しく追加された機能で、後続のオペレータやSubscriberに通知するデータを複数のタイムライン(公式では「rail(レール)」と呼ばれている)に分配して、各データの処理を異なるレール上で処理を行う機能を持つFlowableの一種です。そのため、1つのデータを受け取って処理を実行している最中に、次のデータを別スレッド上で動くレールにて受け取り処理をすることで、複数の処理を同時に実行する(並列モードにする)ことが可能になります。基本的にはParallelFlowableがデータを一つずつ順番に、それぞれのレールに渡すようになっています。

このParallelFlowableはバージョン2.0の間は実験的(Experimental)な位置づけでしたが、2.1にバージョンアップした段階でベータ(Beta)にステータスが上がりました。ただし、今後フィードバックなどを受けて仕様が変わる可能性もあるので、本番環境など実際のプロダクトとして使うのは安定板が出るまで待った方がいいでしょう。
また、ParallelFlowableはFlowableにしかなくObservableには同等の機能を持ったクラスはありません。これは並列で処理を行っている際に、その中のあるレールの処理が遅くなる場合、バックプレッシャーをかけて通知するデータ数に制限をかけないと、そのレールにて処理をすべきデータが無制限に貯まっていく危険性があるためです。ParallelFlowableでは、データを各レールに振り分ける際にそのレールが受け取るデータ数のリクエストを受け、その分だけ通知した後は次のリクエストが来るまで、それ以上のデータを通知しないようにバックプレッシャーをかけて受け取るデータ数を制限することが必要になります。もし、レールの処理が遅れてデータ数のリクエストが来ていない場合は、そのレールに対してデータの振り分けがスキップされます。
加えて、ParallelFlowableは異なるレール上で処理を行うため、購読するSubscriberも複数になります。そのため、ParallelFlowableのsubscribeメソッドはSubscriberの配列を受け取るようになっています。その際、レールの数とSubscriberの数を一致させないと、通知するSubscriberやデータを受け取るためのレールがないといったことが起こり、エラーになってしまいます。
また、ParallelFlowableからFlowableに変換し直すメソッドもあり、データに関する処理を並列モードで行い、最終的にFlowableに変換して1つのSubscriberにデータを通知することも可能です。ただし、各レールでデータの処理を非同期で行うため、結果となるデータの通知順が元のデータ順と同じになることを保証されなくなることに注意する必要があります。
FlowableにあるtakeやskipなどのいくつかのオペレータはParallelFlowableにはありません。それらのオペレータを使う必要がある場合は、ParallelFlowableに変換する前後で使用しなければいけません。
ParallelFlowableの生成
PralellFlowableを生成するには、データを通知するための元となるFlowableが必要です。そして、そのFlowableを変換して新しいParallelFlowableを生成します。この変換するためのオペレータは2種類あり、1つはParallelFlowableのstaticメソッドであるfromメソッド、もう1つはFlowableのインスタンスメソッドであるparallelメソッドになります。ただし、staticメソッドとインスタンスメソッドの違いはあっても、処理する内容は同じです。

fromメソッド(ParallelFlowableのstaticメソッド)
-
from(Publisher<? extends T> source)
-
from(Publisher<? extends T> source, int parallelism)
-
from(Publisher<? extends T> source, int parallelism, int prefetch)
※PublisherはFlowableが実装しているインターフェース
parallelメソッド(Flowableのインスタンスメソッド)
-
parallel()
-
parallel(int parallelism)
-
parallel(int parallelism, int prefetch)
引数の型 | 引数名 | 説明 |
---|---|---|
Publisher | source | ParallelFlowableに変換されるFlowable。fromメソッドでしか使わない。 |
int | parallelism | データを分配する際のレール数。デフォルト(指定しない場合)では実行環境の論理プロセッサ数。 |
int | prefetch | 元となるFlowableに通知するようにリクエストするデータ数。デフォルト(指定しない場合)では128。 |
まず、ここで重要になるのが引数のparallelismです。レールをいくつに分割するのかを設定する値ですが、この引数を設定しない場合はデフォルトで実行環境の論理プロセッサ数になります。しかしParallelFlowableを購読する場合、レール数と同じ数だけのSubscriberが必要になり、そうでない場合はエラーが通知されます。一方、最終的にParallelFlowableからFlowableに変換する場合は、いくつレールがあっても最終的に1つのレールにマージされるため、明示的な指定をしなくてもレールの数とSubscriberの数が不一致であることに関するエラーが通知されることはありません。実行環境の論理プロセッサ数によってレール数が変わるということは、その環境にとってベストパフォーマンスと考えられるレールの分配がされることになります。この変数の設定はParallelFlowable自身に対して購読するのか、もしくはFlowableに変換するのかによって、明示的に指定するほうがいいのか、デフォルトのままのほうがいいのかが変わってきます。
次に気を付けるべきことは引数のprefetchです。これはParallelFlowableの元となるFlowableがそのParallelFlowableへ通知するようにリクエストされたデータ数で、この引数を指定しない場合は自動的に128が設定されます。observeOnメソッドの場合と同様にリクエストしたデータ数の75%分のデータを通知したら、75%分のデータ数のリクエストを繰り返すようになります。つまり、後続の処理が異なるスレッド上で実行されている場合や、後続の処理が通知スピードより遅い場合、データはキャッシュされることになります。通知データはキャッシュから送られ、リクエストしたデータ数分を超えた場合はエラーになってしまいます。これを意識していないと、例えば元のFlowableに処理待ちのデータを破棄する設定(BackpressureStrategy.DROP
)がしてあり、Subscriber側で1件ずつ通知するようにリクエストを行っていたとしても、通知データはバッファされたデータから通知されるため、ドロップされると想定していた古いデータが通知される可能性があるので注意が必要です。
Schedulerの設定(runOnメソッド)
ここで注意しておかないといけないこととして、ParallelFlowableは自動的に非同期処理を行うわけではないことがあります。Schedulerの指定をしていないParallelFlowableは元のFlowableの処理が実行されているスレッド上でParallelFlowableの全レールの処理が実行されるため、あるデータを通知したら、そのデータを受け取ったレールの処理が終わるまで次のデータを通知せず、そのレールの処理が終わって初めて次のデータを別のレールに通知し、そのレールで処理が終わるまで通知元の処理も待機するといったことを繰り返すようになります。

これでは、後続のオペレータやSubscriberに異なるデータを順番に振り分けているのにも関わらず、パフォーマンスとしては何のメリットもありません。それに対し、ParallelFlowableにSchedulerを設定すると、各レールの処理を異なるスレッド上で実行する並列処理になるため、各レールが元のFlowableや他のレールの処理を待機させることがなくなり、パフォーマンスの向上が期待できるようになります。ただし、その場合は各レールの処理が共有される可変のオブジェクトにアクセスしないといった、非同期処理を行っても問題がない作りになっていることが条件になります。

このことを行うため、ParallelFlowableには各レールの処理を指定したScheduler上で実行させるメソッドとしてrunOnメソッドを用意しています。
runOnメソッド
-
runOn(Scheduler scheduler)
-
runOn(Scheduler scheduler, int prefetch)
引数の型 | 引数名 | 説明 |
---|---|---|
Scheduler | scheduler | 各レールの処理をどうようなスレッド上で行うのかを指定するScheduler。 |
int | prefetch | 各レールが通知するようにリクエストするデータ数。デフォルト(指定しない場合)では128。 |
第2引数のprefetch
は各レールが通知するようにリクエストするデータ数です。この引数がない場合はデフォルトで128になります。データ数をリクエストするということは、データを受け取る側の処理が通知する処理より遅くなった場合、対象のレールに通知されるデータはバッファされ、現在のデータの処理が終わってからようやくバッファされたデータから次のデータを受け取るようになることを意味します。例えば、1つのレールの処理が遅い場合、そのレールが受け取るはずのデータはバッファされ、他のレールはバッファされたデータとは異なるデータを受け取り、次々と処理をしていくようになります。

これを避けるには、runOnメソッドのデフォルトを変更し、1件しかデータを通知しないようにリクエストさせる必要があります。すると、各データはデータを受け取る準備ができたレールにのみ次々とデータを通知するようになります。

ただし、注意しないといけない点もあります。全てのレールの処理が遅くなる場合、上流の処理のほうで処理待ちのデータをバッファすることになるので、そこをうまく調整しないとMissingBackpressureExceptionが発生し、エラー通知されることになります。
ParallelFlowableの購読(subscribeメソッド)
PrallelFlowableを購読する場合、レールの数だけSubscriberを用意しないといけません。レールの数と異なっていると、java.lang.IllegalArgumentExceptionが各Subscriberにエラーとして通知されます。ParallelFlowableを直接subscribeメソッドを使って購読する場合はparallelメソッドを呼ぶ際に引数のレール数(parallelism
)を指定しておくことが大事になります。そうしないと実行環境によってレール数が変わってしまうので、ある環境では問題なく動くものが別の環境だとエラーを通知するようになり、意図した動きをしなくなる可能性が高くなります。
また、ParallelFlowableのsubscribeメソッドの引数はSubscriberの配列を受け取るようになっており、戻り値は返しません。そして、FlowableのsubscribeメソッドのようにDisposableを返すsubscribeメソッドは用意されていません。
ParallelFlowableのsubscribeメソッド
-
subscribe(Subscriber<? super T>[] subscribers)
PrallelFlowableの完了通知は、元となるFlowableが完了を通知すると、購読している全てのSubscriberに対して行われます。しかし、エラー通知の場合は完了の場合と異なり、どこでエラーが発生したかによってエラーの通知が変わります。
ParallelFlowableのエラー通知のポイント
- 元となるFlowableの処理中にエラーが発生した場合
- ParallelFlowableでの処理中にエラーが発生した場合
まず、元となるFlowableの処理中にエラーが発生した場合、発生したエラーオブジェクトと供に全てのSubscriberに対してエラー通知を行います。
public static void main(String[] args) throws Exception { ParallelFlowable<Integer> parallelFlowable = Flowable.range(1, 10) // ParallelFlowableに変換する前に例外を発生させる .doOnNext(data -> { if (data == 3L) { throw new Exception("例外発生"); } }) // ParallelFlowableに変換する .parallel(2) // 非同期で処理を行う .runOn(Schedulers.computation()); // Subscriberの配列の作成 @SuppressWarnings("unchecked") Subscriber<Integer>[] subscribers = new Subscriber[2]; subscribers[0] = new DebugSubscriber<Integer>("No.1"); subscribers[1] = new DebugSubscriber<Integer>("--- No.2"); // 購読する parallelFlowable.subscribe(subscribers); // しばらく待つ Thread.sleep(1000L); }
RxComputationThreadPool-1: No.1: 1 RxComputationThreadPool-2: --- No.2: 2 RxComputationThreadPool-1: No.1: java.lang.Exception: 例外発生 RxComputationThreadPool-2: --- No.2: java.lang.Exception: 例外発生
それに対し、ParallelFlowableに変換した後の処理でエラーが発生した場合は、そのエラーが発生したレールを購読しているSubscriberにのみエラーが通知され、他のSubscriberは正常時と同じようにデータを受け取ります。
public static void main(String[] args) throws Exception { ParallelFlowable<Long> parallelFlowable = Flowable.interval(100L, TimeUnit.MILLISECONDS).take(10) // ParallelFlowableに変換する .parallel(2) // 非同期で処理を行う .runOn(Schedulers.computation()) // 並列モードの後に例外を発生させる .doOnNext(data -> { if (data == 3L) { throw new Exception("例外発生"); } }); // Subscriberの配列の作成 @SuppressWarnings("unchecked") Subscriber<Long>[] subscribers = new Subscriber[2]; subscribers[0] = new DebugSubscriber<Long>("No.1"); subscribers[1] = new DebugSubscriber<Long>("--- No.2"); // 購読する parallelFlowable.subscribe(subscribers); // しばらく待つ Thread.sleep(2000L); }
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 RxComputationThreadPool-2: --- No.2: java.lang.Exception: 例外発生 RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-1: No.1: complete
このように、例外が発生してもエラーの通知を受け取るのはエラーが発生したレールのSubscriberのみで、他のSubscriberはそのままデータを受け取り、処理を続けるようになります。また、エラーの通知後に受け取っているデータが全てのデータを受け取っていないことも注目すべきポイントでしょう。これはエラーが発生したレールにも通知データを振り分けているためで、あまり好ましいデータの振り分けではありません。
しかし、RxJava 2.0.8よりレール上で発生した処理に対するデータ通知の対応としてParallelFailureHandlingという構造体(Enum)が追加され、mapメソッドとfilterメソッド、およびdoOnNextメソッドの引数に指定できるようになりました。このParallelFailureHandlingはエラーが発生した際のふるまいをどうするか指定するための構造体で、指定することでエラー発生後のデータの振り分けをある程度は制御することができるようになります。ParallelFailureHandlingには次の種類があります。
ParallelFailureHandling | 説明 |
---|---|
ERROR | レール上の処理でエラーが発生したら、そのレール上のSubscriberにはエラーとして通知し、それ以降のデータは残りのSubscriberが受け取る。 |
STOP | レール上の処理でエラーが発生したら、そのレール上のSubscriberは完了の通知を受け取り、それ以降のデータは残りのSubscriberが受け取る。 |
SKIP | レール上の処理でエラーが発生したら、そのレール上のSubscriberはそのデータの処理をスキップして、それ以降の処理を行う。 |
RETRY | レール上の処理でエラーが発生したら、そのレール上のSubscriberは再度そのデータを使って処理を行う。 |
例えば、先ほどのエラーを発生させるサンプルのdoOnNextメソッド部分にParallelFailureHandlingを設定し、実行したとします。
…略 .doOnNext(data -> { if (data == 3L) { throw new Exception("例外発生"); } }, ParallelFailureHandling.○○○○); // ○○○○にオプションを指定する …略 }
これを実行するとそれぞれ次の結果を得ることになります。
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 RxComputationThreadPool-2: --- No.2: java.lang.Exception: 例外発生 RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-1: No.1: 5 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-1: No.1: 7 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-1: No.1: 9 RxComputationThreadPool-1: No.1: complete
※データ「3」でエラーの通知を受け、他のSubscriberがそれ以降のデータを受け取る
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 RxComputationThreadPool-2: --- No.2: complete RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-1: No.1: 5 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-1: No.1: 7 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-1: No.1: 9 RxComputationThreadPool-1: No.1: complete
※データ「3」で完了の通知を受け、他のSubscriberがそれ以降のデータを受け取る
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-2: --- No.2: 5 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-2: --- No.2: 7 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-2: --- No.2: 9 RxComputationThreadPool-1: No.1: complete RxComputationThreadPool-2: --- No.2: complete
※データ「3」の通知の処理をスキップしている
ParallelFailureHandling.RETRYを指定する場合は、そのエラーが回復可能なエラーでない限り、そのデータの処理を繰り返すことになるため、何度かリトライしたら例外が発生しないようサンプルのコードを変えます。
…略 ParallelFlowable<Long> parallelFlowable = Flowable.rangeLong(0L, 10) // 通知間隔を開けないためにrangeLongに変更 …略 // 並列モードの後に例外を発生させる .doOnNext(new Consumer<Long>() { private int count = 0; @Override public void accept(Long data) throws Exception { if (data == 3L && 3 > count++) { throw new Exception("例外発生"); } } }, ParallelFailureHandling.RETRY); // エラー時の対応 …略
これを実行すると以下のようになり、エラーが発生していたデータ(3)をそのレール上で再度実行し、エラーにならずに処理ができてから後続の処理を行っていることがわかります。
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-1: No.1: complete RxComputationThreadPool-2: --- No.2: 3 RxComputationThreadPool-2: --- No.2: 5 RxComputationThreadPool-2: --- No.2: 7 RxComputationThreadPool-2: --- No.2: 9 RxComputationThreadPool-2: --- No.2: complete
ただし、このリトライは回復可能なエラーの場合は有効ですが、回復しないエラーの場合は同じ処理を延々と繰り返すことになるので注意してください。
購読解除(Subscriberからのcancelメソッド/disposeメソッド)
それでは、次にParallelFlowableの購読解除について見ていきましょう。基本的にはsubscribeメソッドがDisposableを戻り値として返さないことより、現状はParellelFlowableの購読解除がSubscriber外から行われないと想定した設計になっていると考えられます。そうなると、購読を解除するにはSubscriber内でcancelメソッド(およびDisposableSubscriberなどの場合はdisopseメソッド)を呼んで購読解除することになります。そして、購読解除が行われるSubscriberはその購読解除を呼び出したSubscriberのみとなり、他のSubscriberはそのまま購読を続けることになります。
@SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { ParallelFlowable<Long> parallelFlowable = Flowable.interval(100L, TimeUnit.MILLISECONDS).take(10) // ParallelFlowableに変換する .parallel(2) // 非同期で処理を行う .runOn(Schedulers.computation()); Subscriber<?>[] subscribers = { // 通常のデバッグ用Subscriber new DebugSubscriber<Long>("No.1"), // データ「3」でキャンセルするSubscriber new CancelOn3Subscriber("--- No.2") }; // 購読する parallelFlowable.subscribe((Subscriber<Long>[]) subscribers); // しばらく待つ Thread.sleep(2000L); } /** データ「3」を受け取ると購読解除するSubscriber */ private static class CancelOn3Subscriber extends DebugSubscriber<Long> { public CancelOn3Subscriber(String name) { super(name); } @Override public void onNext(Long data) { // データが「3」を受け取った際にキャンセルを行う if (data.equals(3L)) { super.dispose(); System.out.println("購読解除しました。 受け取ったデータ=" + data); return; } super.onNext(data); } }
RxComputationThreadPool-1: No.1: 0 RxComputationThreadPool-2: --- No.2: 1 RxComputationThreadPool-1: No.1: 2 購読解除しました。 受け取ったデータ=3 RxComputationThreadPool-1: No.1: 4 RxComputationThreadPool-1: No.1: 5 RxComputationThreadPool-1: No.1: 6 RxComputationThreadPool-1: No.1: 7 RxComputationThreadPool-1: No.1: 8 RxComputationThreadPool-1: No.1: 9 RxComputationThreadPool-1: No.1: complete
実行結果より、購読解除をしたNo.2のSubscriberはそれ以降データを受け取っていませんが、No.1のSubscriberはそれ以降も通知されたデータを受け取り続けていることがわかります。
注意すべき点として、購読解除のふるまいは2.0.7から変更されています。2.0.6までは1つのSubscriberが購読解除をするとParallelFlowable自体が通知を止めてしまい、全てのSubscriberがデータを受け取らないようになっていました。そのため、ParallelFlowableに対して購読解除を行っている場合は、どのバージョンを使っているのかを確認しましょう。RxJavaのバージョンを更新する必要がある場合は、バージョンを上げることでふるまいが変わらないのかの確認が必要になります。また、この並列処理の機能自体はまだベータ版なので、今後も仕様が変わる可能性があることに注意してください。
Flowableへの変換(sequentialメソッド)
ParallelFlowableは異なるレール上で処理を行い、それぞれのレール上のSubscriberに通知を行います。しかし、処理が複数に分岐して同時に実行していても最終的に各レールの通知をマージして、1つのSubscriberで購読したいことがあります。つまり、ParallelFlowableをFlowableに変換するということです。それを実現するため、ParallelFlowableにはsequentialメソッドを用意しています。

sequentialメソッドは1つのレール上の処理でエラーが発生した場合、そのタイミングでエラーを通知してきます。しかし、場合によっては他の正常に処理をしているレールの処理を終えてからエラーの通知を受け取りたい場合もあるかと思います。そのため、エラーの通知を最後にするsequentialDelayErrorメソッドも用意されています。

注意点として、エラー発生後のデータ通知がParallelFailureHandlingの設定の有無で変わることを意識していないと、意図したデータやエラーの通知を受け取れない可能性があります。
また、sequentialDelayErrorメソッドは2.0.7より導入された機能なので、それ以前のバージョンにはないことに注意してください。
sequential/sequentialDelayErrorメソッド
-
rsequential()
-
sequential(int prefetch)
-
sequentialDelayError()
-
sequentialDelayError(int prefetch)
引数の型 | 引数名 | 説明 |
---|---|---|
int | prefetch | 各レールに対して通知するようにリクエストするデータ数。デフォルト(指定しない場合)では128。 |
まとめ
今回はRxJava 2.0.5から新たに追加されたParallelFlowableについて、執筆時点では最新のバージョン2.1.2を使って前回よりも詳しく見てきました。ただし、このParallelFlowabeはまだベータ版の段階なので、クラス名やメソッド名の変更やふるまいが変わる可能性が高く、今のバージョンになるまでに何度かふるまいやAPIの変更が発生しています。今後もいろいろと変わっていく可能性が高いので、プロダクトに導入するのはベータ版が終わり安定版となるまで待ったほうがいいでしょう。