最終更新日:2020/09/02 原本2017-01-24

RxJava(2.x)の最初に知っておいてもらいたいオペレータ

RxJavaによるリアクティブプログラミング入門(5)

 この連載はRxJavaを使って、リアクティブプログラミングにおけるポイントやRxJavaが持つ機能について学んでいくことを目的としています。前回はRxJava 2.xを使って簡単なサンプルを実装しました。今回はFlowableやObservableの生成、通知するデータの選別や変換などを行い、新たなFlowableやObservableなどを生成するオペレータについて見ていきます。今回はRxJavaを始めるにあたって、まず知っておいてもらいたいオペレータについてRxJava 2.xで用意されている代表的なものを見ていきます。

対象読者

  • Java経験者(初心者可)
  • RxJava未経験者
  • リアクティブプログラミング未経験者

 ※ただし、前回までの連載を読んでいる前提です。

RxJavaのオペレータとは

 RxJavaのオペレータとは、前述したように新たにFlowableやObservableを生成したり、生成したFlowableやObservableが通知するデータの選別や変換などを行ったデータを通知する新たなFlowableやObservableなどを生成したりするメソッドのことです。オペレータの結果として生成されるものは、基本的にはFlowableから呼ばれればFlowableで、Observableから呼ばれればObservableを生成します。例外として、最終的に通知するデータが1件以内の場合、戻り値が今回はまだ紹介しないSingleやMaybeやCompletableになることもありますが、基本的な考え方は同じです。この戻り値がFlowable/Observableを返す性質からメソッドをつなげていくことで、データがいくつかのオペレータの処理を通過し消費者に対して使いやすくなったデータを通知する最終的なFlowable/Observableを生成できます。

 また、実装でもオペレータをつなげていくメソッドチェインで、実装を簡潔にすることも可能です。例えば、次のサンプルはjustメソッドを使って引数のデータを通知するFlowableを生成し、filterメソッドを使って、そのFlowableが通知するデータを偶数のみにし、mapメソッドを使って通知するデータを10倍にしているFlowableをメソッドチェインを使って実装しています。

メソッドチェインの例
public static void main(String[] args) {
  Flowable<Integer> result = Flowable
      // 引数のデータを通知するFlowableを生成
      .just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      // 偶数のデータのみを通知する
      .filter(data -> data % 2 == 0)
      // 通知するデータを10倍にする
      .map(data -> data * 10);
  
  // 購読する
  result.subscribe(data -> System.out.println(data));
}

 これを実行すると次の結果が出力されます。

実行結果
20
40
60
80
100

 これをメソッドチェインを使わずに実装すると次のようになります。

public static void main(String[] args) {
  // 引数のデータを通知するFlowableを生成
  Flowable<Integer> flowable1 = Flowable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  // 偶数のデータのみを通知する
  Flowable<Integer> flowable2 = flowable1.filter(data -> data % 2 == 0);
  // 通知するデータを10倍にする
  Flowable<Integer> result = flowable2.map(data -> data * 10);
  
  // 購読する
  result.subscribe(data -> System.out.println(data));
}

 このようにメソッドチェインを使わないと、途中で次のメソッドを呼ぶためだけの不要なインスタンスを定義する必要が出てきてしまいます。しかし、メソッドチェインを使うとこのような不要なインスタンスを生成する必要がなくなり、変数名を間違えてしまったりするようなことがなくなります。また、元のデータを最終的にどうしたいのかも、実装の一連の流れから分かりやすくなるメリットもあります。

 また、一見するとデザインパターンの一つであるビルダーパターンのように見えますが、ビルダーパターンと異なり、オペレータを設定した時点でオペレータの処理を適用した新たなFlowableが生成されています。さらにこのことは、オペレータをつなげる順序によってどのようなデータが通知されるのかにも影響し、かつパフォーマンスにも影響します。例えば、10倍した後に偶数かどうかを判定するとすべてのデータが偶数になりますし、仮に10倍せず違う処理をしてデータのフィルターした場合、意図したようにデータのフィルターができたとしても、最終的に通知されない不要なデータに対して処理を行っていることになり、それがコストのかかる処理の場合は無駄にコストをかけていることになります。そのためRxJavaでメソッドチェインを使う際はオペレータの順番について何が効率的なのかを意識しないといけません。

 加えて、RxJavaのオペレータの多くは引数に関数型インターフェースを受け取るようになっています。関数型インターフェースとは簡単に説明すると、実装すべきメソッドを1つだけ持つインターフェースです。この関数型インターフェースを実装するのにJava 8からラムダ式という関数型インターフェースの実装を簡易化した記述ができるようになっています。ここではラムダ式については特に説明しませんが、ラムダ式を使うと簡潔に実装ができ、かつ可読性も上がることが多いです。RxJavaの多くのオペレータは引数に関数型インターフェースを受け取るようになっているので、ラムダ式を使えるようにした方がよいでしょう。また、Java 8が使えない環境の場合、retrolambdaなどのラムダ式を古いJavaのバージョンでも使えるようにするライブラリもあるので、可能ならばラムダ式を使って実装したほうが良いでしょう。

 では、今回のサンプルが何を行っているのかについて細かく見ていきましょう。まずjustメソッドを使って引数のデータである次のデータを通知するFlowableを生成しています。

通知するデータ
1
2
3
4
5
6
7
8
9
10

 このFlowableに対しfilterメソッドを使って、偶数のデータのみを通知するようにしています。filterメソッドの引数にはRxJavaのPredicateの関数型インターフェースを受け取るようになっており、どのデータを通知するのかを判別しています。また、このPredicateはJava 8のPredicateではないので、パッケージがio.reactivex.functionsになっているのと、Exceptionをthrowできるようになっていることに注意してください。

filterメソッド
filter(Predicate<? super T> predicate)
io.reactivex.functions.Predicateの定義
public interface Predicate<T> {
  boolean test(T t) throws Exception;
}

 このfilterメソッドのPredicateはどのデータを通知するのかを判定する関数型インターフェースで、実装すべきメソッドとしてtestメソッドを持っています。このtestメソッドは引数に通知するデータを受け取り、そのデータを通知するかどうかを判定し、通知するならtrueを、通知しないならfalseを返すように定義します。今回のサンプルのfilterメソッドでは偶数なら通知するように実装しています。

偶数のみを通知するようにする実装
.filter(data -> data % 2 == 0)

 このサンプルはラムダ式を使って実装していますが、これをラムダ式を使わずに実装すると次のようになります。

ラムダ式を使わない場合
.filter(new Predicate<Integer>() {
  
  @Override
  public boolean test(Integer data) throws Exception {
    return data % 2 == 0;
  }
})

 つまり、testメソッドで通知データであるdataを受け取り、それが偶数ならtrueを返すようにしています。そうすることで、ここでは結果がtrueになるデータのみが通知されるようになり、このfilterメソッドから生成されたFlowableは次のデータを通知します。

通知するデータ
2
4
6
8
10

 そして、このFlowableに対しmapメソッドを使って、通知するデータを10倍するようにしています。mapメソッドの引数にはRxJavaのFunctionの関数型インターフェースを受け取るようになっており、受け取ったデータをどのように変換するのかを定義しています。また、このFunctionはJava 8のFunctionではないので、パッケージがio.reactivex.functionsになっているのと、Exceptionをthrowできるようになっていることに注意してください。

mapメソッド
map(Function<? super T, ? extends R> mapper)
io.reactivex.functions.Functionの定義
public interface Function<T, R> {
  R apply(T t) throws Exception;
}

 このmapメソッドのFunctionは受け取ったデータをどのように変換して通知するのかを定義する関数型インターフェースで、実装すべきメソッドとしてapplyメソッドを持っています。このapplyメソッドは引数に通知するデータを受け取り、そのデータから新たなデータを生成して返します。そしてこの戻り値のデータが結果として通知されるデータとなります。このサンプルのmapメソッドでは受け取ったデータを10倍して返すように実装しています。

10倍したデータを通知するようにする実装
.map(data -> data * 10)

 このサンプルはラムダ式を使って実装していますが、これをラムダ式を使わずに実装すると次のようになります。

ラムダ式を使わない場合
.map(new Function<Integer, Integer>() {
  
  @Override
  public Integer apply(Integer data) throws Exception {
    return data * 10;
  }
})

 つまり、applyメソッドで通知データであるdataを受け取り、それを10倍したデータを返すようにしています。そして、その戻り値が通知されるデータとなります。このmapメソッドから生成されたFlowableは次のデータを通知します。

通知するデータ
20
40
60
80
100

 それでは、簡単にオペレータの実装例を見たところで、RxJavaにはどのようなオペレータが用意されているのか、主なオペレータについて見ていきましょう。今回のサンプルはFlowableを使って実装していますが、Observableの場合もバックプレッシャーを扱わない限り、基本的にはほぼ同じ使い方になります。また、今後のサンプルではデバッグ用のSubscriberとして次のものを用意しています。

DebugSubscriber.java
/** サンプル用のSubscriber */
public class DebugSubscriber<T> extends ResourceSubscriber<T> {
  
  private String label;
  
  public DebugSubscriber() {
    super();
  }
  
  public DebugSubscriber(String label) {
    super();
    this.label = label;
  }
  
  @Override
  public void onNext(T data) {
    // onNextメソッドの呼び出し時に出力
    String threadName = Thread.currentThread().getName();
    if (label == null) {
      System.out.println(threadName + ": " + data);
    } else {
      System.out.println(threadName + ": " + label + ": " + data);
    }
  };
  
  @Override
  public void onError(Throwable throwable) {
    // onErrorメソッドの呼び出し時に出力
    String threadName = Thread.currentThread().getName();
    if (label == null) {
      System.out.println(threadName + ": エラー = " + throwable);
    } else {
      System.out.println(threadName + ": " + label + ": エラー = " + throwable);
    }
  }
  
  @Override
  public void onComplete() {
    // onCompleteメソッドの呼び出し時に出力
    String threadName = Thread.currentThread().getName();
    if (label == null) {
      System.out.println(threadName + ": 完了");
    } else {
      System.out.println(threadName + ": " + label + ": 完了");
    }
  }
}

 サンプルでは、Subscriberで特殊なことをしない限りは、このDebugSubscriberを使っています。

Flowable/Observableを生成するオペレータ

 Flowable/Observableを生成するオペレータには前回見たcreateメソッドの他にも、さまざまなオペレータがあります。ここでは前回のサンプルで使ったcreateメソッドを除いた代表的なオペレータとして次のものを見ていきます。

  • just
  • fromArray/fromIterable
  • interval
  • timer

 この他にもFlowable/Observableを生成するさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。

just

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • just(T item)
  • just(T item1, T item2)
  • just(T item1, T item2, T item3)

と増えていき最大10個の引数を持つメソッドが用意されています。

 justメソッドは引数に渡したデータを通知するFlowable/Observableを生成するオペレータです。引数は最大10個まで指定することが可能で、左から順にデータが通知されます。すべてのデータを通知したら完了(onComplete)を通知します。

サンプル

 次のサンプルでは、justメソッドの引数に渡した「A」「B」「C」「D」「E」を順に通知するFlowableを生成しています。すべてのデータを通知し終えたら完了(onComplete)を通知します。

justメソッドのサンプル
public static void main(String[] args) {
  
  // 引数のデータを順に通知するFlowableの生成
  Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E");  // ①
  
  // 購読開始
  flowable.subscribe(new DebugSubscriber<>());
}
  1. justメソッドを使って、引数のデータを通知するFlowableを生成する。
実行結果
main: A
main: B
main: C
main: D
main: E
main: 完了

 実行結果より、引数のデータを左から順にSubscriberに通知し、すべてのデータを通知後に完了を通知していることが分かります。

fromArray/fromIterable

マーブルダイアグラム(fromArray)
マーブルダイアグラム(fromArray)
主なメソッド
  • fromArray(T... items)
  • fromIterable(Iterable<? extends T> source)

 fromArrayメソッドは引数に指定した配列の要素を、fromIterableメソッドはListなどのIterableの要素を順にデータとして通知するFlowable/Observableを生成するオペレータです。生成したFlowable/Observableはすべてのデータを通知し終えたら、完了を通知します。また、fromArrayメソッドの引数は配列の要素を直接指定することもできます。

サンプル

 次のサンプルでは、配列の要素を順に通知するFlowableを生成しています。今回は配列のインスタンスをつくらず要素を直接引数に指定しています。配列のすべての要素をデータとして通知し終えたら完了(onComplete)を通知します。

fromArray(items)のサンプル
public static void main(String[] args) {
  
  // 配列のデータを順に通知するFlowableの生成
  Flowable<String> flowable = Flowable.fromArray(
      "A", "B", "C", "D", "E" );  // ①
  
  // 購読開始
  flowable.subscribe(new DebugSubscriber<>());
}
  1. 引数の配列の要素を順に通知するFlowableを生成する。
実行結果
main: A
main: B
main: C
main: D
main: E
main: 完了

 実行結果より、引数の配列の要素が順に通知し、すべてのデータを通知後に完了を通知していることが分かります。

interval

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • interval(long time, TimeUnit unit)
  • interval(long time, TimeUnit unit, Scheduler scheduler)
  • interval(long initialDelay, long time, TimeUnit unit)
  • interval(long initialDelay, long time, TimeUnit unit, Scheduler scheduler)

 intervalメソッドは指定した通知間隔(インターバル)で0から始まるLong値のデータを通知するFlowable/Observableを生成するオペレータです。通知されるデータは「0」「1」「2」と順に通知されていきます。このintervalメソッドで生成されたFlowable/ObservableはデフォルトでSchedulers.computation()のScheduler上で実行され、呼び出し元のスレッドとは異なるスレッド上で実行されます。このSchedulerを変更するには引数にSchedulerを受け取るメソッドがあり、それを使うことでデフォルトのSchedulerを変更できます。

 また、最初の通知データである「0」を通知するタイミングは、デフォルトでは処理を開始してすぐではなく、指定したインターバルだけ開けて通知します。しかし、引数に最初のデータ通知の待機時間(initialDelay)を指定できるものもあり、そのメソッドを使うことで最初の「0」を通知するタイミングだけ変えることができます。

 そして、intervalメソッドで生成したFlowable/Observableは完了をすることがないので、完了を通知するにはtakeメソッドなどで通知するデータの個数を制限するなどしないといけません。

 あと、これはJavaで時間を扱う処理をすること全般におけることですが、指定した時間は正確ではなく、CPUの負荷などの影響を受けるため、ある程度の誤差は発生します。

サンプル

 次のサンプルでは、intervalメソッドを使って1000ミリ秒ごとに「0」から始まる数値を通知するFlowableを生成しています。また、通知している間隔を見るために、データを受け取った際に実行時刻を出力しています。

interval(time, unit)のサンプル
  public static void main(String[] args) throws Exception {
    // 「分:秒.ミリ秒」の文字列に変換するFormatter
    final DateTimeFormatter formatter =
        DateTimeFormatter.ofPattern("mm:ss.SSS");
    
    // 1000ミリ秒ごとに数値を通知するFlowableの生成
    Flowable<Long> flowable = 
        Flowable.interval(1000L, TimeUnit.MILLISECONDS);  // ①
    
    // 処理を開始する前の時間
    System.out.println("開始時間: " + LocalTime.now().format(formatter));
    
    // 購読する
    flowable.subscribe(data -> {  // ②
      // Thread名の取得
      String threadName = Thread.currentThread().getName();
      // 現在時刻の「分:秒.ミリ秒」を取得
      String time = LocalTime.now().format(formatter);
      // 出力
      System.out.println(threadName + ": " + time + ": data=" + data);
    });
    
    // しばらく待つ
    Thread.sleep(5000L);
  }
  1. intervalメソッドを使って、1000ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
  2. 通知されたデータを受け取った際の時刻を「分:秒.ミリ秒」で出力させる。
実行結果
開始時間: 55:29.085
RxComputationThreadPool-1: 55:30.105: data=0
RxComputationThreadPool-1: 55:31.105: data=1
RxComputationThreadPool-1: 55:32.106: data=2
RxComputationThreadPool-1: 55:33.105: data=3
RxComputationThreadPool-1: 55:34.105: data=4

 実行結果より、ほぼ1秒(1000ミリ秒)単位で「0」「1」「2」と順に数値が通知されていることが分かります。また、デフォルトの設定だと、最初の「0」を通知するまでに1秒(1000ミリ秒)ほど待機しているのも分かります。

timer

マーブルダイアグラム
マーブルダイアグラム
  • timer(long time, TimeUnit unit)
  • timer(long time, TimeUnit unit, Scheduler scheduler)

 timerメソッドは呼び出されてから指定した時間だけ待機した後、1つのLong値「0」を通知し完了するFlowable/Observableを生成するオペレータです。このtimerメソッドで生成されたFlowable/ObservableはデフォルトでSchedulers.computation()のScheduler上で実行され、呼び出し元のスレッドとは異なるスレッド上で実行されます。このSchedulerを変更するには引数にSchedulerを受け取るメソッドがあり、それを使うことでデフォルトのSchedulerを変更できます。

 また、前述したように指定した時間は正確ではなく、CPUの負荷などの影響を受けるため、ある程度の誤差は発生します。

サンプル

 次のサンプルでは、timerメソッドを使って1000ミリ秒後に「0」を通知するFlowableを生成しています。また、通知している待機時間を見るために、データを受け取った際に実行時刻を出力しています。

timer(time, unit)のサンプル
public static void main(String[] args) throws Exception {
  // 「分:秒.ミリ秒」の文字列に変換するFormatter
  final DateTimeFormatter formatter =
      DateTimeFormatter.ofPattern("mm:ss.SSS");
  
  // 処理を開始する前の時間
  System.out.println("開始時間: " + LocalTime.now().format(formatter));
  
  // 1000ミリ秒後に数値「0」を通知するFlowableの生成
  Flowable<Long> flowable = 
      Flowable.timer(1000L, TimeUnit.MILLISECONDS);  // ①
  
  // 購読開始
  flowable.subscribe(
      // 第1引数: データの通知時
      data -> {  // ②
        // Thread名の取得
        String threadName = Thread.currentThread().getName();
        // 現在時刻の「分:秒.ミリ秒」を取得
        String time = LocalTime.now().format(formatter);
        // 出力
        System.out.println(threadName + ": " + time + ": data=" + data);
      },
      // 第2引数: エラーの通知時
      error -> System.out.println("エラー=" + error),
      // 第3引数: 完了の通知時
      () -> System.out.println("完了"));  // ③
  
  // しばらく待つ
  Thread.sleep(1500L);
}
  1. timerメソッドを使って、1000ミリ秒後に「0」を通知するFlowableを生成。
  2. 通知されたデータを受け取った際の時刻を「分:秒.ミリ秒」で出力させる。
  3. 完了の通知を受けとった際に「完了」と出力させる。
実行結果
開始時間: 55:02.564
RxComputationThreadPool-1: 55:03.684: data=0
完了

 実行結果より、ほぼ1000ミリ秒後に「0」を通知していることが分かります。また、データを通知した後に完了も通知していることが分かります。

通知するデータを制限するオペレータ

 Flowable/Observableが通知するデータの中には、場合によっては必要ないデータが含まれることがあります。このような不要なデータを受け取り、その受け取ったデータに対して変換などしても、最終的にそのデータを使って処理をしないことになるため無駄に時間とリソースを使ってしまうことになります。ここでは、このような不要なデータを通知せず、必要なデータのみを通知するためのオペレータについて見ていきます。このような通知するデータを制限する代表的なオペレータとして次のものがあります。

  • filter
  • take
  • skip
  • distinctUnitlChanged
  • throttleWithTimeout/debounce

 この他にも通知データの制限を行うさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。

filter

マーブルダイアグラム
マーブルダイアグラム
メソッド
  • filter(Predicate<? super T>predicate)

 filterメソッドは受け取ったデータが条件に合うか判定し、結果がtrueのもののみを通知するオペレータです。条件の判定は引数の関数型インターフェースで行います。この引数のPredicateは通知されたデータを受け取り、そのデータを通知する場合はtrueを、通知しない場合はfalseを返すように実装します。この引数のPredicateはRxJavaのものでパッケージはio.reactivex.functionsになります。

predicateの実装例
// 偶数なら通知する
new Predicate<Long>() {
  
  @Override
  public boolean test(Long data) throws Exception {
    return data % 2 == 0;
  }
};

サンプル

 次のサンプルでは、intervalメソッドで生成したFlowableに対し、filterメソッドを使って偶数しか通知しないようにしています。

filter(predicate)のサンプル
public static void main(String[] args) throws Exception {
  Flowable<Long> flowable =
      // Flowableの生成
      Flowable.interval(300L, TimeUnit.MILLISECONDS)  // ①
          // 偶数のみ通知する
          .filter(data -> data % 2 == 0);  // ②
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(3000L);
}
  1. intervalメソッドで300ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
  2. filterメソッドで通知されたデータが偶数ならtrueを返し、偶数のデータのみ通知するFlowableを生成。
実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 6
RxComputationThreadPool-1: 8

 実行結果より、Predicateで定義したように偶数のデータのみ通知されていることが分かります。

take

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • take(long count)
  • take(long time, TimeUnit unit)

 takeメソッドは引数に指定したデータ数や期間に達するまで、受け取ったデータを通知するオペレータです。指定したデータ数や期間に達したら、完了を通知して処理を終了します。もし、指定した範囲が元の生産者が通知するデータ数より多い場合、元のデータをすべて通知して、完了します。例えば、空のFlowable/Observableに対してtakeメソッドを使ってもエラーにはならず、元の空のFlowable/Observableのように完了のみを通知します。

サンプル

 次のサンプルでは、intervalメソッドで生成したFlowableに対し、takeメソッドを使って3件までしかデータを通知しないようにしています。

take(count)のサンプル
public static void main(String[] args) throws Exception {
  Flowable<Long> flowable =
      // Flowableの生成
      Flowable.interval(1000L, TimeUnit.MILLISECONDS)
          // 3件まで通知する
          .take(3);
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(4000L);
}
  1. intervalメソッドで1000ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
  2. takeメソッドで最初の3件のデータを通知させ完了させる。
実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 完了

 実行結果より最初の3件のデータのみ通知されていることが分かります。

skip

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • skip(long count)
  • skip(long time, TimeUnit unit)

 skipメソッドは最初に通知されるデータから指定した分だけのデータをのぞき、その後からのデータを通知するオペレータです。スキップする範囲はデータ数や経過時間を指定できます。もし、指定した範囲が元のFlowable/Observableが通知するデータ数より多い場合、データを通知することなく完了を通知して処理を終了します。例えば、空のFlowable/Observableに対してskipメソッドを使ってもエラーにはならず、元の空のFlowable/Observableのように完了のみを通知します。

サンプル

 次のサンプルでは、intervalメソッドで生成したFlowableに対し、skipメソッドを使って最初の2件のデータを通知しない(スキップする)ようにしています。

skip(count)のサンプル
public static void main(String[] args) throws Exception {
  Flowable<Long> flowable =
      // Flowableの生成
      Flowable.interval(1000L, TimeUnit.MILLISECONDS)
          // 最初の2件は通知しない
          .skip(2);
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
  
  // しばらく待つ
  Thread.sleep(5000L);
}
  1. intervaleメソッドで1000ミリ秒ごとに「0」「1」「2」と順に数値を通知するFlowableを生成。
  2. skipメソッドで最初の2件を通知しないFlowableを生成。
実行結果
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4

 実行結果より最初の「0」「1」が通知されていないことが分かります。

distinctUnitlChanged

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • distinctUntilChanged()

 distinctUntilChangedメソッドは直前に通知したデータと等しいデータを連続して通知しようとしている場合にそのデータを除外して通知するオペレータです。例えば「A」を通知した後に再び「A」を通知しようとした場合、後者のデータは通知されません。ただし、重複していても連続していない場合は除外しません。

サンプル

 次のサンプルではjustメソッドで生成したFlowableに対し、distinctUntilChangedメソッドを使って、すでに通知したデータと連続して重複するデータを除いて通知しています。

distinctUntilChanged()のサンプル
public static void main(String[] args) {
  Flowable<String> flowable =
      // Flowableの生成
      Flowable.just("A", "a", "a", "A", "a")
          // 連続して重複したデータを除いて通知する
          .distinctUntilChanged();
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
}
  1. justメソッドで引数のデータを通知するFlowableを生成。
  2. distinctUntilChangedメソッドで通知しているデータが連続して重複している場合、そのデータを通知しないFlowableを生成。
実行結果
main: A
main: a
main: A
main: a
main: 完了

 実行結果よりデータが連続して重複している場合、そのデータは通知されていないことが分かります。ただし、一度除外されたデータであっても、連続していない場合は通知されることが分かります。

throttleWithTimeout/debounce

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • throttleWithTimeout(long time, TimeUnit unit)
  • debounce(long time, TimeUnit unit)

 throttleWithTimeout/debounceメソッドは元のFlowable/Observableからデータを受け取った後、指定した期間内に別のデータを受け取らなければ、そのデータを通知することを繰り返すオペレータです。期間内に別のデータが来たら、次はそのデータから指定した期間に別のデータが来ないか計測します。ただし、指定した時間内であっても完了やエラーの通知は可能で、完了が通知される場合は最後に通知されるデータとともに完了を通知し、エラーの場合はエラーのみ通知します。

 また、throttleWithTimeoutメソッドもdebounceメソッドも引数が同じメソッドは、名前が違うだけで同じ処理を行います。

サンプル

 次のサンプルでは、createメソッドで生成したFlowableに対し、throttleLastメソッドを使って、データを受け取ってから500ミリ秒間に他のデータを受け取らない場合にそのデータを通知するようにしています。

throttleWithTimeout(time, unit)のサンプル
public static void main(String[] args) throws Exception {
  Flowable<String> flowable =
      // Flowableの生成
      Flowable.<String> create(  // ①
          // 通知処理
          emitter -> {
            // データを通知し、しばらく待つ
            emitter.onNext("A");
            Thread.sleep(1000L);
            
            emitter.onNext("B");
            Thread.sleep(300L);
            
            emitter.onNext("C");
            Thread.sleep(300L);
            
            emitter.onNext("D");
            Thread.sleep(1000L);
            
            emitter.onNext("E");
            Thread.sleep(100L);
            
            // 完了を通知
            emitter.onComplete();
          }, BackpressureStrategy.BUFFER)
          // 指定した期間に次のデータが来なければ通知する
          .throttleWithTimeout(500L, TimeUnit.MILLISECONDS);  // ②
  
  // 購読する
  flowable.subscribe(new DebugSubscriber<>());
}
  1. createメソッドでデータを通知した後にしばらく待ってから、次のデータを通知するFlowableを生成する。
  2. throttleWithTimeoutメソッドでデータを通知してから500ミリ秒間次のデータを通知しないデータのみ通知するようにする。
実行結果
RxComputationThreadPool-1: A
RxComputationThreadPool-1: D
main: E
main: 完了

 実行結果より「A」を受け取った後の500ミリ秒間に次のデータを受け取らなかったので、「A」が通知できたことが分かります。続くデータ「B」「C」は通知後の500ミリ秒間に次のデータを受け取っているので、これらのデータは通知できません。続く「D」は受け取った後の500ミリ秒間に次のデータを受け取らなかったので「D」を通知することが可能になっています。続く「E」は、500ミリ秒待つ間に完了の通知を受け取っているので、500ミリ秒待つことなく、「E」と完了を通知していることが分かります。

データを変換して通知するオペレータ

 Flowable/Observableが通知するデータをそのまま通知するのではなく、そのデータから新たなデータを生成して通知したいことがよくあります。例えば、商品のIDを通知するFlowable/ObservableからIDを受け取り、そのIDに該当する商品のオブジェクトを取得してデータとして通知したい場合など、何らかの変換を行いたい場合です。SubscriberやObserverではなく、オペレータで処理を行うことで、Subscriber/Observerでは最終的なデータを受け取った際の処理のみ行い、オペレータではデータの加工をするように役割を分担できるメリットもあります。ここでは、通知するデータから新たなデータを生成し、そのデータを通知するための代表的なオペレータについて見ていきます。

  • map
  • flatMap

 この他にも通知データを変換するさまざまなオペレータがRxJavaには用意されています。さらにどのようなメソッドがあるのかを知りたい場合は、英語ですが、次のGitHub上にあるRxJavaのWikiを参照してください。

map

マーブルダイアグラム
マーブルダイアグラム
主なメソッド
  • map(Function<? super T,? extends R> mapper)

 mapメソッドは元のFlowable/Observableから通知されたデータを変換し、その変換したデータを通知するオペレータです。どのように変換するかは引数の関数型インターフェースで定義します。ただし、受け取ったデータに対し必ず1件のデータを通知しなければならず、変換した結果がnullになるようなデータや複数になるデータには変換できません。

 また、引数のmapperは元のデータをどのように変換するのかを定義する関数型インターフェースです。この引数のFunctionはRxJavaのものでパッケージはio.reactivex.functionsになります。変換したデータの型は受け取ったデータの型と同じでも違っていてもどちらでも構いません。

mapperの実装例
// StringのデータをBigDecimalに変換する
new Function<String, BigDecimal>() {
    
    @Override
    public BigDecimal apply(String data) throws Exception {
        return new BigDecimal(data);
    }
}

サンプル

 次のサンプルでは元の大文字のデータをmapメソッドを通して小文字のデータに変換して通知するようにしています。

map(mapper)のサンプル
public static void main(String[] args) throws Exception {
    
    Flowable<String> flowable =
            // 引数のデータを順に通知するFlowableの生成
            Flowable.just("A", "B", "C", "D", "E")  // ①
                    // mapメソッドを使って小文字に変換
                    .map(data -> data.toLowerCase()); // ②
    
    // 購読開始
    flowable.subscribe(new DebugSubscriber<>());
}
  1. "A", "B", "C", "D", "E"を順に通知するFlowableを生成。
  2. mapメソッドで元のデータをtoLowerCase()で小文字に変換し、それを通知するFlowableを生成。
実行結果
main: a
main: b
main: c
main: d
main: e
main: 完了

 実行結果より大文字のデータを小文字のデータに変換して通知していることが分かります。今回のサンプルでは受け取ったデータと同じ型を返していますが、異なる型を返すことも可能です。

flatMap

マーブルダイアグラム
マーブルダイアグラム
Flowable/Observableの主なメソッド
  • flatMap(Function<? super T,? extends Publisher/ObservableSource<? extends R>> mapper)

 「Publisher/ObservableSource」はFlowableの場合はPublisherになり、Observableの場合はObservableSourceになることを表す。

 flatMapメソッドはmapメソッドのように元のデータを変換し、その変換したデータを通知するオペレータです。しかし、flatMapメソッドではmapメソッドと異なり、受け取ったデータから複数のデータを持つFlowable/Observableを返すことで、1つのデータから複数のデータを通知することが可能になります。さらに、空のFlowable/Observableを返すことで特定のデータの通知を止めたり、エラーのFlowable/Observableを返すことでエラーを通知したりすることが可能になります。

 このflatMapメソッドの関数型インターフェースでFlowable/Observableを返すということはメソッドチェインにおいて重要になるポイントです。例えば、変換の結果がnullのデータや条件に合わないデータを受け取った場合、空のFlowable/Observableを返すことで、後続の処理にはそのデータを通知しなくしたり、処理中にエラーが発生する場合、エラーのFlowable/Observableを返すことでエラーの通知をしたりできます。

 また、引数のmapperは通知されるデータを受け取り、それに対し何らかの変換を行い、その変換をしたデータを通知するFlowable/Observableを返す関数型インターフェースです。最終的にmapperの戻り値となるFlowable/Observableが持つデータがそれぞれ通知されます。そのため関数型インターフェースの戻り値が複数のデータを含むFlowable/Observableを返す場合、1つのデータから複数のデータを通知できるようになります。また、空のFlowable/Observableを返す場合は、そのデータを通知させないようにできます。さらに、エラーを返すFlowable/Observableを戻り値とした場合、エラーが通知され次のデータの処理が行われなくなります。

mapperの実装例

// 受け取った各データを2度通知するようにする
new Function<Integer, Flowable<? extends Integer>>() {
  
  @Override
  public Flowable<? extends Integer> apply(Integer data)
      throws Exception {
    return Flowable.just(data, data);
  }
}

 また、この戻り値が時間を扱うようなFlowable/Observableにした場合、そのFlowable/ObservableはflatMapメソッドが実行しているスレッドとは異なるスレッド上で処理を行うので注意が必要です。関数型インターフェースで受け取ったデータからFlowable/Observableを生成する処理自体はシーケンシャルに行われますが、その生成されたFlowable/Observableが行う処理は非同期で行われます。つまり、戻り値となるFlowable/Observableを別スレッド上で動かされることで、通知されるデータがそれぞれ異なるスレッド上で行われた処理結果になるため、受け取ったデータの順に結果が通知がされなくなる可能性があります。もし、データ順にこだわる必要がある場合は、今回は説明していないconcatMapメソッドやconcatMapEagerメソッドを使う必要があります。

サンプル

 次のサンプルでは空文字が含まれているデータをflatMapメソッドを通して大文字と小文字の2つのデータを300ミリ秒後に通知するようにしています。その際に空文字を取り除いて通知するようにしています。また、300ミリ秒遅らせるのに今回は説明していないdelayメソッドを使っていますが、このメソッドは受け取った通知を指定した時間だけ遅らせて通知するオペレータです。

flatMap(mapper)のサンプル

public static void main(String[] args) throws Exception {
  // 引数のデータを順に通知するFlowableの生成
  Flowable<String> flowable = Flowable.just("A", "B", "", "", "C")  // ①
      // flatMapメソッドを使って空文字を除きかつ小文字に変換
      .flatMap(data -> {
        if ("".equals(data)) {
          // 空文字なら空のFlowableを返す
          return Flowable.empty();  // ②
        } else {
          // 受け取ったデータの大文字と小文字を3000ミリ秒後に通知する
          return Flowable
              .just(data.toUpperCase(), data.toLowerCase())
              .delay(300L, TimeUnit.MILLISECONDS);  // ③
        }
      });
  
  // 購読開始
  flowable.subscribe(new DebugSubscriber<>());

  // しばらく待つ
  Thread.sleep(1000L);
}
  1. justメソッドより"A", "B", "", "", "C"を順に通知するFlowableを生成。
  2. 受け取ったデータが空文字なら空のFlowableを返す。
  3. 受け取ったデータが空文字でないなら、その文字を大文字と小文字の2つのデータを300ミリ秒後に通知するFlowableを返す。
実行結果

RxComputationThreadPool-1: A
RxComputationThreadPool-1: a
RxComputationThreadPool-3: C
RxComputationThreadPool-3: c
RxComputationThreadPool-2: B
RxComputationThreadPool-2: b
RxComputationThreadPool-2: complete

 実行結果より受け取ったデータを大文字と小文字のデータに変換して通知し、かつ空文字のデータを除外していることが分かります。加えて、関数型インターフェースで生成しているFlowableがそれぞれ異なるスレッド上で実行されるため、結果として通知されるデータは受け取った順になっていないこともポイントの一つになります。また、今回のサンプルでは受け取ったデータと同じ型(String)を返していますが、異なる型(Stringでないクラス)のデータを返すことも可能です。

まとめ

 今回はFlowableやObservableの基本的なオペレータについて見ていきました。ここで見たのは代表的なもので、ここで紹介したもの以外にも数多くのオペレータが用意されています。実際にRxJavaで実装する際に、ここで紹介したオペレータ以外に欲しいものがある可能性が高いです。その場合は、英語なので読むのが大変かもしれませんが、RxJavaのJavaDocを見てみるのがよいでしょう。その際のコツとしては、JavaDocのマーブルダイアグラムを見て、イメージにあったオペレータかどうかを判別して、もしイメージに合うオペレータなら説明を読むようにします。そして、簡単なサンプルを作って実際の動作確認をしてみるのがよいでしょう。