最終更新日:2020/09/02 原本2017-02-23

RxJava(2.x)の便利なオペレータ(結合/ユーティリティ/デバッグ)

RxJavaによるリアクティブプログラミング入門(6)

 この連載はRxJavaを使って、リアクティブプログラミングにおけるポイントやRxJavaが持つ機能について学んでいくことを目的としています。前回はRxJava 2.xの最も基本となるオペレータについて解説しました。今回は前回紹介していない複数のFlowable/Observableを結合するオペレータとユーティリティ系のオペレータ、そしてデバッグ用(ログ出力)に使えるオペレータの中でも代表的なものについて見ていきます。今回もサンプルはFlowableを使って実装していますが、Observableの場合もバックプレッシャーを扱わない限り、基本的にはほぼ同じ使い方になります。また、データを受け取るSubscriberとして、特別なことをしない限り、第5回で作成したDebugSubscriberを使用しています。

対象読者

  • Java経験者(初心者可)
  • RxJava未経験者
  • リアクティブプログラミング未経験者

 ※ただし、前回までの連載を読んでいる前提です。

複数のFlowable/Observableを結合するオペレータ(1)

 複数のFlowable/Observableを結合するオペレータは大きく分けて2つの種類があり、一つは単に結合してそれぞれのデータを通知するもの、もう一つは各Flowable/Observableが通知したデータを組み合わせて新しいデータを生成してから通知するものです。ここではこれらの代表的なオペレータとして次のものを見ていきます。

  • merge
  • combineLatest
  • zip

 この他にも複数のFlowable/Observableを結合するさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。

merge

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2)
  • merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2, Publisher/ObservableSource<? extends T> source3)
  • merge(Publisher/ObservableSource<? extends T> source1, Publisher/ObservableSource<? extends T> source2, Publisher/ObservableSource<? extends T> source3, Publisher/ObservableSource<? extends T> source4)

 「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。また、PublisherはFlowableが実装しているインターフェースであり、ObservableSourceはObservableが実装しているインターフェースである。

 mergeメソッドは複数のFlowable/Observableが通知するデータを1つのFlowable/Observableを通して通知するオペレータです。このメソッドを使うことで、複数のFlowable/Observableの通知を1つのSubscriber/Observerに購読させることができるようになります。処理を開始する際は引数のFlowable/Observableを同時に実行させ、それぞれがデータを通知するタイミングで結果のFlowable/Observableからデータが通知されます。ただし、複数のFlowable/Observableが同時にデータを通知しても、結果として通知する際は同期が取られシーケンシャルに通知されるようになっています。そして、完了を通知するタイミングは引数の全てのFlowable/Observableが完了する時になります。mergeメソッドは引数に最大4つまでのFlowable/Observableを取りますが、それより多くのFlowable/Observableを結合したい場合は、引数を配列で取るmergeArrayメソッドを使うことで対応することができます。

 ちなみに、引数のFlowable/Observableのどれかがエラーを通知したら、そのタイミングでエラーを通知し処理が終了します。もし、エラーの通知を保留して他の正常なFlowable/Observableを処理してから最後にエラーを通知したい場合は、mergeDelayErrorメソッドを使うことで対応することができます。

サンプル

 次のサンプルではintervalメソッドで生成した2つのFlowableについて、mergeメソッドを使って1つのFlowableにまとめ、そのFlowableを経由してデータを通知するようにしています。また、引数となるそれぞれのFlowableは通知する間隔やデータ数が異なるようになっています。このmergeメソッドで生成されたFlowableを実行すると引数のFlowableを全て同時に実行され、引数のFlowableを持つデータがmergeメソッドの結果となるFlowableから通知されることになります。

merge(source1, source2)のサンプル
public static void main(String[] args) throws Exception {
  // マージ対象
  Flowable<Long> flowable1 =  // (1)
      // 300ミリ秒ごとにデータを通知する 
      Flowable.interval(300L, TimeUnit.MILLISECONDS)
          // 5件まで
          .take(5);
  
  // マージ対象
  Flowable<Long> flowable2 =  // (2)
      // 500ミリ秒ごとにデータを通知する
      Flowable.interval(500L, TimeUnit.MILLISECONDS)
          // 2件まで
          .take(2)
          // 100加算する
          .map(data -> data + 100L);
  
  // 複数のFlowableをマージする
  Flowable<Long> result = Flowable.merge(flowable1, flowable2); // (3)
  
  // 購読する
  result.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(2000L);
}
  1. intervalメソッドを使って、300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。takeメソッドを使って5件まで通知させる
  2. intervalメソッドを使って、500ミリ秒ごとに数値を通知するFlowableを生成し、takeメソッドを使って2件まで通知させ、mapメソッドを使い受け取ったデータに「100」を加算したデータを通知させる。結果として500ミリ秒ごとに「100」「101」を通知するFlowableになる
  3. mergeメソッドを使って(1)と(2)のFlowableを1つにまとめ、同時に実行するようにする
実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-2: 100
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-2: 101
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 完了

 実行結果より、マージ元のFlowableが同時に処理が実行されていることがわかります。さらに、(2)のFlowableが完了した後も(1)のデータを通知していることより、全てのFlowableが完了を通知するまで全てのデータが通知されることがわかります。

経過時間と通知されるデータ
経過時間 0 300 500 600 900 1000 1200 1500
(1)のFlowable   0   1 2   3 4
(2)のFlowable     100     101    
結果のデータ   0 100 1 2 101 3 4

複数のFlowable/Observableを結合するオペレータ(2)

combineLatest

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • combineLatest(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, BiFunction<? super T1,? super T2,? extends R> combiner)
  • combineLatest(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, Publisher/ObservableSource<? extends T3> source3, Function3<? super T1,? super T2,? super T3,? extends R> combiner)

と増えていき、引数となるFlowable/Observableを最大9つまで取るものが用意されています。

 「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。また、PublisherはFlowableが実装しているインターフェースであり、ObservableSourceはObservableが実装しているインターフェースである。

 combineLatestメソッドは、複数のFlowable/Observableが通知するそれぞれのデータを引数の関数型インターフェースに渡すことで新たなデータを生成して通知するオペレータです。通知のタイミングは最初のデータのみ各Flowable/Observableがデータを通知するのを待って、それ以降は各Flowable/Observableがデータを通知するたびに、それまでに通知している最新のデータを使って新しいデータを生成していきます。完了を通知するタイミングは、全ての引数のFlowable/Observableが完了を通知したタイミングで完了するようになります。

 また、引数には結合するFlowable/Observableの他に、それぞれの受け取ったデータをどのように変換し新しいデータを生成するのかを定義する関数型インターフェースを持っています。この関数型インターフェースは受け取るFlowable/Observableの数によって、BiFunction、Function3、Function4、……と変わっていきます。

combinerの実装例(引数のFlowable/Observableが2つの場合)
// 引数のFlowable/Observableが通知したそれぞれのデータを格納したListを生成する
new BiFunction<Long, Long, List<Long>>() {
  
  @Override
  public List<Long> apply(Long data1, Long data2) throws Exception {
    return Arrays.asList(data1, data2);
  }
};

 ちなみに、引数のFlowable/Observableのどれかがエラーを通知したら、そのタイミングでエラーを通知し処理が終了します。もし、エラーの通知を保留して他の正常なFlowable/Observableを処理してから最後にエラーを通知したい場合は、combineLatestDelayErrorメソッドを使うことで対応することができます。

サンプル

 次のサンプルでは、intervalメソッドで生成した2つのFlowableを引数に取るcombineLatestメソッドを使って、元となるFlowableがデータを通知するたびに、その時点で最後に通知したデータを使って、新しいデータを生成し通知しています。今回はそれぞれのFlowableから受け取ったデータを格納したListを通知するようにしています。

combineLatest(source1, source2, combiner)のサンプル
public static void main(String[] args) throws Exception {
  // 結合対象
  Flowable<Long> flowable1 =  // (1)
      // 300ミリ秒ごとにデータを通知する
      Flowable.interval(300L, TimeUnit.MILLISECONDS)
          // 5件まで
          .take(5);
  
  // 結合対象
  Flowable<Long> flowable2 =  // (2)
      // 500ミリ秒ごとにデータを通知する
      Flowable.interval(500L, TimeUnit.MILLISECONDS)
          // 3件まで
          .take(3)
          // 100加算する
          .map(data -> data + 100L);
  
  // 複数のFlowableから受け取ったデータで新しいデータを生成する
  Flowable<List<Long>> result = Flowable.combineLatest(  // (3)
      // 結合するFlowable
      flowable1,
      // 結合するFlowable
      flowable2,
      // 引数より通知されたデータをListに格納して通知
      (data1, data2) -> Arrays.asList(data1, data2));
  
  // 購読する
  result.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(2000L);
}
  1. intervalメソッドを使って、300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。takeメソッドを使って5件まで通知させる
  2. intervalメソッドを使って、500ミリ秒ごとに数値を通知するFlowableを生成し、takeメソッドを使って3件まで通知させ、mapメソッドを使い受け取ったデータに「100」を加算したデータを通知させる。結果として500ミリ秒ごとに「100」「101」「102」を通知するFlowableになる
  3. combineLatestメソッドを使って(1)と(2)のFlowableが通知したデータを受け取り、それらのデータを格納したListを、結果として通知する新しいデータとして生成する
実行結果
RxComputationThreadPool-2: [0, 100]
RxComputationThreadPool-1: [1, 100]
RxComputationThreadPool-1: [2, 100]
RxComputationThreadPool-2: [2, 101]
RxComputationThreadPool-1: [3, 101]
RxComputationThreadPool-1: [4, 101]
RxComputationThreadPool-2: [4, 102]
RxComputationThreadPool-2: 完了

 実行結果より、最初のデータのみ両方のFlowableがデータを通知するまで待ち、それ以降は各Flowableがデータを通知するたびに、その時点での引数が通知する最新のデータを使って、結果として通知するデータを生成していることがわかります。また、完了のタイミングは各Flowableのすべてのデータを通知していることより、最後にデータを通知したFlowableが完了したタイミングで実行されていることがわかります。

zip

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • zip(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, BiFunction<? super T1,? super T2,? extends R> zipper)
  • zip(Publisher/ObservableSource<? extends T1> source1, Publisher/ObservableSource<? extends T2> source2, Publisher/ObservableSource<? extends T3> source3, Function3<? super T1,? super T2,? super T3,? extends R> zipper)

と増えていき、引数となるFlowable/Observableを最大9つまで取るものが用意されています。

 「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。また、PublisherはFlowableが実装しているインターフェースであり、ObservableSourceはObservableが実装しているインターフェースである。

 zipメソッドは複数のFlowable/Observableが通知するそれぞれのデータを引数の関数型インターフェースに渡すことで新たなデータを生成して通知するオペレータです。通知のタイミングは各Flowable/Observableのデータがそろうのを待ってから新しいデータを生成するため、通知ペースが遅いFlowable/Observableに合わせることになります。完了を通知するタイミングは、データ数が最も少ないFlowable/Observableが完了したタイミングになります。

 また、引数には、結合するFlowable/Observableの他に、それぞれの受け取ったデータをどのように変換し新しいデータを生成するのかを定義する関数型インターフェースを持っています。この関数型インターフェースは受け取るFlowable/Observableの数によって、BiFunction、Function3、Function4、……と変わっていきます。

zipperの実装例(引数のFlowable/Observableが2つの場合)
// 引数のFlowable/Observableが通知したそれぞれのデータを格納したListを生成する
new BiFunction<Long, Long, List<Long>>() {
  
  @Override
  public List<Long> apply(Long data1, Long data2) throws Exception {
    return Arrays.asList(data1, data2);
  }
};

サンプル

 次のサンプルでは、intervalメソッドで生成した2つのFlowableを引数に取るzipメソッドを使って、元となる各Flowableのデータがそろったタイミングで新しいデータを生成し通知しています。今回はそれぞれのFlowableから受け取ったデータを格納したListを通知するようにしています。

zip(source1, source2, zipper)のサンプル
public static void main(String[] args) throws Exception {
  // 結合対象
  Flowable<Long> flowable1 =  // (1)
      // 300ミリ秒ごとにデータを通知する
      Flowable.interval(300L, TimeUnit.MILLISECONDS)
          // 5件まで
          .take(5);
  
  // 結合対象
  Flowable<Long> flowable2 =  // (2)
      // 500ミリ秒ごとにデータを通知する
      Flowable.interval(500L, TimeUnit.MILLISECONDS)
          // 3件まで
          .take(3)
          // 100加算する
          .map(data -> data + 100L);
  
  // 複数のFlowableから受け取ったデータで新しいデータを生成する
  Flowable<List<Long>> result = Flowable.zip(  // (3)
      // 結合するFlowable
      flowable1,
      // 結合するFlowable
      flowable2,
      // 引数より通知されたデータをListに格納して通知
      (data1, data2) -> Arrays.asList(data1, data2));
  
  // 購読する
  result.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(2000L);
}
  1. intervalメソッドを使って、300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。takeメソッドを使って5件まで通知させる
  2. intervalメソッドを使って、500ミリ秒ごとに数値を通知するFlowableを生成し、takeメソッドを使って3件まで通知させ、mapメソッドを使い受け取ったデータに「100」を加算したデータを通知させる。結果として500ミリ秒ごとに「100」「101」「102」を通知するFlowableになる
  3. zipメソッドを使って(1)と(2)のFlowableが通知したデータを受け取り、それらのデータを格納したListを、結果として通知する新しいデータとして生成する
実行結果
RxComputationThreadPool-2: [0, 100]
RxComputationThreadPool-2: [1, 101]
RxComputationThreadPool-2: [2, 102]
RxComputationThreadPool-2: 完了

 実行結果より、引数全てのFlowableの通知データがそろったタイミングで新しいデータを生成し、そのデータを通知していることがわかります。また、完了のタイミングは3件しか生成されていないことから、最もデータ数が少ないFlowableが完了したタイミングで実行されていることがわかります。逆に言うと、データ数が多いFlowableが持つ超過したデータは通知されることはありません。

ユーティリティ系のオペレータ

 今まで基本となるFlowable/Observableの生成からデータの選択や変換をするオペレータ、そして先ほどは複数のFlowable/Observableの結合するオペレータを見てきましたが、それ以外の処理も行いたいことがあります。ここではその他の便利に使えるオペレータをユーティリティ系のオペレータとし代表的なものとして次のものを見ていきます。

  • repeat
  • delay

 この他にもFlowable/Observableを操作するさまざまなユーティリティ系オペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。

 またこの中にある「do」で始まるオペレータについては、デバッグ用として後述します。

repeat

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • repeat
  • repeat(long times)

 repeatメソッドは元のFlowable/Observableの処理が完了する際にデータの通知を最初から繰り返し行うようにするオペレータです。例えば元のFlowable/Observableが「1」「2」「3」とデータを通知し完了する場合、repeatメソッドを使うことで「1」「2」「3」「1」「2」「3」……と繰り返しデータを通知するようになります。repeatメソッドの引数がないものは完了することなく繰り返しデータを通知し続け、引数があるものは繰り返しの回数を指定できます。例えば、「A」「B」「C」と通知するFlowableに対し、repeat(2)を使うと「A」「B」「C」「A」「B」「C」と通知し、その後に完了を通知します。

 また、引数に0未満の数値を渡すと、IllegalArgumentExceptionが発生し、引数に「0」を渡すと何もデータを通知せず完了だけ通知する空のFlowable/Observableになります。

サンプル

 次のサンプルではjustメソッド生成したFlowableに対し、repeatメソッドを使って、データの通知が2回行われるようにしています。

repeat(times)のサンプル
public static void main(String[] args) {
  Flowable<String> flowable =
      // Flowableの生成
      Flowable.just("A", "B", "C")  // (1)
          // 通知を繰り返す
          .repeat(2);  // (2)
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
}
  1. justメソッドを使って「A」「B」「C」を通知するFlowableを生成
  2. repeatメソッドを使って、データ通知を2回繰り返すようにする
実行結果
  
main: A
main: B
main: C
main: A
main: B
main: C
main: 完了

 実行結果より、データ通知が2回行われていることがわかります。

delay

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • delay(long time, TimeUnit unit)

 delayメソッドはFlowable/Observableから受け取ったデータを指定した期間だけ遅らせて通知するオペレータです。遅らせる期間は引数に時間を渡すことで指定できます。また、ここでは紹介しませんが、delayメソッドの中には引数に関数型インターフェースを取るものもあり、その場合は関数型インターフェースから生成されたFlowable/Observableがデータを通知するタイミングで結果のデータ通知を遅らせるように指定することができます。

 ちなみに、delayメソッドとよく似たオペレータとしてdelaySubscriptionメソッドがあります。このオペレータはdelayメソッドとは異なり、データの通知を遅らせるのではなく、指定した期間だけ処理の開始を遅らせ、データは生成したらすぐに通知するようになっています。

サンプル

 次のサンプルでは、justメソッドで生成したFlowableに対し、delayメソッドを使って、2000ミリ秒遅らせてデータを通知するようにしています。また、処理の開始時と購読およびデータ通知時に、確認のためシステム時間を出力しています。

delay(time, unit)のサンプル
public static void main(String[] args) throws Exception {
  // 処理の開始時間の出力
  System.out.println("処理開始: " + System.currentTimeMillis());
  
  Flowable<String> flowable =
      // Flowableの生成
      Flowable.<String> create(emitter -> {  // (1)
        // 購読の開始時間の出力
        System.out.println("購読開始: " + System.currentTimeMillis());
        // データの通知
        emitter.onNext("A");
        emitter.onNext("B");
        emitter.onNext("C");
        // 完了の通知
        emitter.onComplete();
      }, BackpressureStrategy.BUFFER)
          // 通知を遅らせる
          .delay(2000L, TimeUnit.MILLISECONDS);  // (2)
  // 購読する
  flowable.subscribe(data -> {  // (3)
    System.out.println(
        "通知時間: " + System.currentTimeMillis() + ": " + data);
  });
  
  // しばらく待つ
  Thread.sleep(3000L);
}
  1. createメソッドを使って「A」「B」「C」と通知するFlowableを生成。購読開始時にシステム時間を出力する
  2. delayメソッドで通知するタイミングを2000ミリ秒遅らせる
  3. 結果としてデータを受け取った際にその時にシステム時間とデータを出力する
実行結果
処理開始: 1485837554484
購読開始: 1485837555045
通知時間: 1485837557058: A
通知時間: 1485837557058: B
通知時間: 1485837557058: C

 実行結果より、「購読開始」(……5045)と「通知時間」(……7058)の時間差が約2000ミリ秒あることにより、delayメソッドによって通知データが指定した時間だけ遅れて通知されていることがわかります。また、「処理開始」と「購読開始」の時間の差は引数の指定した時間ほど遅れていないので、購読自体に影響を与えているわけではないことがわかります。

デバッグ用のオペレータ(1)

 ユーティリティ系のオペレータの中には「do」で名前が始まるいくつかのオペレータがあります。これらは各通知時に実行され、引数には戻り値がない(void)関数型インターフェースを受け取るようになっています。つまり、これらのオペレータは各通知時に何らかの副作用(オブジェクトの状態を変えたり、生産者―消費者間の外部に影響を与えたりすること)を起こすことを意味します。しかし、RxJavaでは基本的に受けとったデータを使って副作用を起こすことは消費者側で行うことを前提としています。

 しかし、さまざまなオペレータを経由しているFlowable/Observableの場合、途中でデータにアクセスできないと、元のデータがどのように変更しているのかを把握することが難しくなり、また、オペレータ内でデータを使って何らかの処理をしているコードの中で、デバッグのためのログを出力するような実装を入れることは、あまり好ましいことではありません。そこで、この「do」で名前が始まるオペレータを使って、各通知が行われる際にログを出力するようにすることで、ビジネスロジックを行うコードとデバッグを行うためにログを出力するコードを分割することができるようになります。ここでは、このような「do」で名前が始まるメソッドの中から代表的なものとして次のものを見ていきます。

  • doOnNext
  • doOnComplete
  • doOnError
  • doOnSubscribe
  • doOnRequest
  • doOnCancel/doOnDispose

 ちなみに、これらのオペレータをビジネスロジックとして使うことは可能ですが、本来は消費者側で行えることをこれらのオペレータで行うことは単に複雑さを増すことになるため、基本的にはビジネスロジックに影響を与えるような使い方は避けたほうが良いでしょう。

 また、ここでは、これらのオペレータのマーブルダイアグラムを公式のJavaDocのものから変えているので、わかりづらい場合は公式のJavaDocも参照してください。

doOnNext

マーブルダイアグラム
マーブルダイアグラム
メソッド
  • doOnNext(Consumer<? super T> onNext)

 doOnNextメソッドはFlowable/Observableがデータを通知する際に、引数に指定した関数型インターフェースの処理を行わせるメソッドです。実装する、関数型インターフェースのメソッドの引数に通知するデータが渡されるため、どのようなデータを受け取ったのかを確認することが可能です。

doOnComplete

マーブルダイアグラム
マーブルダイアグラム
メソッド
  • doOnComplete(Action onComplete)

 doOnCompleteメソッドはFlowable/Observableが完了を通知する際に、引数に指定した関数型インターフェースの処理を行わせるメソッドです。実装する、関数型インターフェースのメソッドの引数には何も渡されません。

doOnError

マーブルダイアグラム
マーブルダイアグラム
メソッド
  • doOnError(Consumer<? super Throwable> onError)

 doOnErrorメソッドはFlowable/Observableがエラーを通知する際に、引数に指定した関数型インターフェースの処理を行わせるメソッドです。実装する、関数型インターフェースのメソッドの引数には通知するエラーオブジェクトが渡されるため、どのようなエラーオブジェクトを受け取ったのかを確認することが可能です。

doOnSuscribe

マーブルダイアグラム
マーブルダイアグラム
Flowableのメソッド
  • doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Observableのメソッド
  • doOnSubscribe(Consumer<? super Disposable> onSubscribe)

 doOnSubscribeメソッドはFlowable/Observableが購読されて処理を開始する準備ができた際に、引数に指定した関数型インターフェースの処理を行わせるメソッドです。消費者(Subscriber/Observer)のonSubscribeメソッドを呼ぶ際に実行されます。実装する、関数型インターフェースのメソッドの引数にFlowableの場合はSubscription、Observableの場合はDisposableが渡され、これが最終的にSubscriberのonSubscribeメソッドに渡されます。

doOnRequest

マーブルダイアグラム
マーブルダイアグラム
メソッド
  • doOnRequest(LongConsumer onRequest)

 doOnRequestメソッドはFlowableがデータ数をリクエストされた際に、引数に指定した関数型インターフェースの処理を行わせるメソッドです。実装する、関数型インターフェースのメソッドの引数には、リクエストしたデータ数が渡されるので、リクエストしたデータ数を確認することが可能です。また、このメソッドはバックプレッシャー機能がないObservableにはありません。

デバッグ用のオペレータ(2)

doOnCancel/doOnDispose

マーブルダイアグラム
マーブルダイアグラム
Flowableのメソッド
  • doOnCancel(Action onCancel)
Observableのメソッド
  • doOnDispose(Action onDispose)

 doOnCancelメソッドはFlowableの購読が解除された際に、doOnDisposeメソッドはObservableの購読が解除された際に、引数に指定した関数型インターフェースの処理を行わせるメソッドです。ただし、doOnCancel/doOnDisposeメソッドは途中で購読解除されたのではなく、完了やエラーで終了した場合は実行されないので注意が必要です。また、実装する、関数型インターフェースのメソッドの引数には何も渡されません。

「do」の名前で始まるオペレータを使ったサンプル

正常終了時のサンプル

 次のサンプルではintervalメソッドで生成したFlowableに対し、takeメソッドを使って3件まで出力するようにしています。そのFlowableに対し、各通知に対する「do」で名前が始まるメソッドを使って通知時のログを出力するようにしています。

 また、データを受け取り処理をするSubscriberでは、1件ずつデータを通知するようにリクエストをし、各通知を受ける度にこちらでもログを出力するようにしています。

サンプル
public static void main(String[] args) throws Exception {
  Flowable.interval(500L, TimeUnit.MILLISECONDS)
      // 3件まで
      .take(3)
      // データ通知時のログ
      .doOnNext(
          data -> System.out.println("doOnNext: data=" + data))
      // 完了時のログ
      .doOnComplete(() -> System.out.println("doOnComplete"))
      // エラー時のログ
      .doOnError(
          error -> System.out.println("doOnError: error=" + error))
      // 購読開始時のログ
      .doOnSubscribe(
          subscription -> System.out.println("doOnSubscribe"))
      // データ数のリクエスト時のログ
      .doOnRequest(
          size -> System.out.println("doOnRequest: size=" + size))
      // 購読解除時のログ
      .doOnCancel(() -> System.out.println("doOnCancel"))
      // 購読する
      .subscribe(new Subscriber<Long>() {
        
        private Subscription subscription;
        
        @Override
        public void onSubscribe(Subscription subscription) {
          System.out.println("購読開始");
          this.subscription = subscription;
          subscription.request(1L);
        }
        
        @Override
        public void onNext(Long data) {
          System.out.println("データ=" + data);
          subscription.request(1L);
        }
        
        @Override
        public void onError(Throwable error) {
          System.out.println("エラー=" + error);
        }
        
        @Override
        public void onComplete() {
          System.out.println("完了");
        }
      });
  
  // しばらく待つ
  Thread.sleep(2000L);
}
実行結果
doOnSubscribe
購読開始
doOnRequest: size=1
doOnNext: data=0
データ=0
doOnRequest: size=1
doOnNext: data=1
データ=1
doOnRequest: size=1
doOnNext: data=2
データ=2
doOnRequest: size=1
doOnComplete
完了

 実行結果より、まずdoOnSubscribeメソッドが実行された後にSubscriberのonSubscribeメソッドが呼ばれていることがわかります。そして、そこでデータ数のリクエストを行うとdoOnRequestメソッドが実行されています。そして、データを通知する際にdoOnNextメソッドが呼ばれ、その後にSubscriberがデータを受け取り、SubscriberのonNextメソッドを実行していることがわかります。それを繰り返し、全てのデータを通知し終えたら、doOnCompleteメソッドが実行され、SubscriberのonCompleteメソッドが実行されていることがわかります。

 また、doOnCancelメソッドが実行されていないことから、完了時にはdoOnCancelメソッドが呼ばれないことがわかります。

購読解除時のサンプル

 次のサンプルではintervalメソッドで生成したFlowableに対し、takeメソッドで5件まで出力するようにしています。そのFlowableに対し、各通知に対する「do」で名前が始まるメソッドを使って通知時のログを出力するようにしています。ただし、途中で購読を解除するようにしています。

サンプル
public static void main(String[] args) throws Exception {
  Disposable disposable =
      Flowable.interval(500L, TimeUnit.MILLISECONDS)
          // 5件まで
          .take(5)
          // データ通知時のログ
          .doOnNext(
              data -> System.out.println("doOnNext: data=" + data))
          // 完了時のログ
          .doOnComplete(() -> System.out.println("doOnComplete"))
          // エラー時のログ
          .doOnError(error -> System.out
              .println("doOnError: error=" + error))
          // 購読開始時のログ
          .doOnSubscribe(
              subscription -> System.out.println("doOnSubscribe"))
          // データ数のリクエスト時のログ
          .doOnRequest(size -> System.out
              .println("doOnRequest: size=" + size))
          // 購読解除時のログ
          .doOnCancel(() -> System.out.println("doOnCancel"))
          // 購読する
          .subscribeWith(new DebugSubscriber<>());
  
  // しばらく待った後に購読を解除する
  Thread.sleep(1000L);
  disposable.dispose();
  
  // しばらく待つ
  Thread.sleep(2000L);
}
実行結果
doOnSubscribe
doOnRequest: size=9223372036854775807
doOnNext: data=0
RxComputationThreadPool-1: 0
doOnNext: data=1
RxComputationThreadPool-1: 1
doOnCancel

 実行結果より処理の途中で明示的に購読解除を行うことでdoOnCancelメソッドが実行され、それ以降の通知が行われていないことがわかります。また、今回はSubscriberとしてDisposableSubscriberを実装したDebugSubscriberを使っているため、最初にLong.MAX_VALUEをrequestメソッドで渡しており、最初だけdoOnRequestメソッドが実行されていることがわかります。

まとめ

 今回はFlowableやObservableの便利なオペレータについて見ていきました。ここで見たのは代表的なもので、ここで紹介したもの以外にも数多くのオペレータが用意されています。RxJavaには他にもさまざまな機能を用意していますが、取りあえずは今まで見てきたオペレータだけでも多くのことができるようになります。RxJavaを学ぶには、まずはさまざまなオペレータを簡単なサンプルで試してみて、マーブルダイアグラムと比較するのが良いかと思います。