最終更新日:2020/09/02 原本2016-11-09

RxJava 2.xを用いた簡単なサンプルプログラムの実装

RxJavaによるリアクティブプログラミング入門(4)

 この連載はRxJavaを使って、リアクティブプログラミングにおけるポイントやRxJavaが持つ機能について学んでいくことを目的としています。前回はReactive StreamsとRxJava 2.xのポイントについて簡単に見ましたが、今回はこのRxJava 2.xを使って実際に実装すると、どうなるのかについて見ていきます。

 RxJava 1.xの回では"Hello, World!"と"こんにちは、世界!"を通知し、受け取った通知に対しての出力を行うサンプルを作りました。今回はこのサンプルと同じような処理をするものをRxJava 2.xを使って実装してみましょう。

対象読者

  • Java経験者(初心者可)
  • RxJava未経験者
  • リアクティブプログラミング未経験者

 ※ただし、前回までの連載を読んでいる前提です。

環境構築

 RxJava 2.xではRxJava 1.xとは違いReactive Streamsとの依存があるため、RxJava 2.xのjarだけでなく、Reactive Streamsのjarにもパスを通すことで初めてRxJava 2.xを使えるようになります。そのため、MavenやGradleを使ってRxJava 2.xとReactive Streamsとの間の依存関係を自動で解決させるようにしたほうがよいでしょう。

 Mavenを使って環境を構築する場合、pom.xmlに次のdependencyを追加して取得することでRxJava 2.xが使える環境を構築できます。今回は執筆時点での最新版である2.0.0を使います。

Mavenの場合
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.0.0</version>
</dependency>

 同様にGradleからも追加が可能です。dependenciesに下記を追加してください。

Gradleの場合
dependencies {
    …… 
    compile 'io.reactivex.rxjava2:rxjava:2.0.0'
}

 MavenやGradleを使うと、Reactive Streamsの記述を行わなくても自動的にRxJava 2.xが依存しているReactive Streamsのjarをダウンロードしてくれます。こうすることで、新しいバージョンに変更する際に依存するライブラリのバージョンの対応を意識しなくてよくなります。

 しかし、これらのツールを使わずに環境を構築したい場合、Mavenの検索サイトから直接jarを取得することも可能です。

 その場合、Mavenの検索サイトから、

g:"io.reactivex.rxjava2" AND a:"rxjava"

と入力し検索すると、RxJava 2.xの最新のjarがダウンロードできるようになります。

Mavenの検索画面(RxJava 2.x)
Mavenの検索画面(RxJava 2.x)

 ただし、この場合はReactiveStreamsのjarが含まれていないので自分でダウンロードする必要があります。そのため、Mavenの検索サイトから、

g:"org.reactivestreams" AND a:"reactive-streams"

で検索し、Reactive Streamsの対象のバージョンのjarをダウンロードしないといけません。

Mavenの検索画面(Reactive Streams)

Mavenの検索画面(Reactive Streams)

 現段階ではReactive Streamsのバージョンは1.0.0なのですが、今後それぞれがバージョンアップした際は対応するバージョンを意識しないといけないので、各jarを個別に取得して設定する場合は注意してください。

Flowable(Reactive Streams対応)を使ったサンプル

 それでは、実際にRxJava 2.xを使ったサンプルを作成してみましょう。まずはReactive Streamsに対応しているFlowableで実装した場合を見てみましょう。今回のサンプルもRxJava 1.xのサンプルと同じように「Hello, World!」と「こんにちは、世界!」のデータを通知し、その受け取ったデータを出力して、すべてのデータを通知した後に完了の通知を行い、「完了しました」と出力するようにします。FlowableにはRxJava 1.xのObservableと同様にcreateメソッドがあるので、このcreateメソッドを使って実装を行います。

 また今回のサンプルでは、バックプレッシャーをかけるためにはどのようにデータをリクエストするのかを見るために、データを1件ずつ通知するようにリクエストし、その通知が処理されたら再度1件のデータをリクエストすることを繰り返し行うようにしています。さらに通知するデータ量が少ないこともあり、通知待ちのデータはすべてバッファし通知するまで保持するようにしています。

 今回のサンプルの処理の流れは次のようになります。

サンプルのシーケンス図
サンプルのシーケンス図
  1. SubscriberがFlowableを購読し、Flowableの処理を開始する。
  2. FlowableがSubscriotionを生成する。
  3. Flowableが購読を開始したことをSubscriberに通知(onSubscribe)し、その際に生成したSubscriptionを渡す。
  4. Subscriberが受け取ったSubscriptionのrequestメソッドを使って1件だけデータを通知するようにリクエストする。
  5. Flowableが文字列"Hello, World!"を通知する。
  6. Subscriberがデータを受け取り、"Hello, World!"と出力する。
  7. SubscriberがSubscriptionのrequestメソッドを使って1件だけデータを通知するようにリクエストする。
  8. Flowableが文字列"こんにちは、世界!"を通知する。
  9. Subscriberがデータを受け取り、"こんにちは、世界!"と出力する。
  10. すべてのデータを通知した後にFlowableが完了(onComplete)したことを通知する。
  11. 完了の通知を受け取ったSubscriberが"完了しました"と出力する。

 また、このサンプルではデータを受け取るSubscriberの処理をFlowableの処理とは異なるスレッド上で行うようにしており、どのスレッドで処理を行っているのかを確認するため、実行しているスレッド名も出力しています。

Flowableで実装した場合のサンプル
public static void main(String[] args) throws Exception {
  
  // あいさつの言葉を通知するFlowableの生成
  Flowable<String> flowable =
      Flowable.create(new FlowableOnSubscribe<String>() {
        
        @Override
        public void subscribe(FlowableEmitter<String> emitter)
            throws Exception {
          
          String[] datas = { "Hello, World!", "こんにちは、世界!" };
          
          for (String data : datas) {
            // 購読解除されている場合は処理をやめる
            if (emitter.isCancelled()) {
              return;
            }
            
            // データを通知する
            emitter.onNext(data);
          }
          
          // 完了したことを通知する
          emitter.onComplete();
        }
      }, BackpressureStrategy.BUFFER);  // 超過したデータはバッファする
  
  flowable
      // Subscriberの処理を別スレッドで行うようにする
      .observeOn(Schedulers.computation())
      // 購読する
      .subscribe(new Subscriber<String>() {
      
        /** データ数のリクエストおよび購読の解除を行うオブジェクト */
        private Subscription subscription;
        
        // 購読が開始された際の処理
        @Override
        public void onSubscribe(Subscription subscription) {
          // SubscriptionをSubscriber内で保持する
          this.subscription = subscription;
          // 受け取るデータ数をリクエストする
          this.subscription.request(1L);
        }
        
        // データを受け取った際の処理
        @Override
        public void onNext(String item) {
          // 実行しているスレッド名の取得
          String threadName = Thread.currentThread().getName();
          // 受け取ったデータを出力する
          System.out.println(threadName + ": " + item);
          
          // 次に受け取るデータ数をリクエストする
          this.subscription.request(1L);
        }
        
        // 完了を通知された際の処理
        @Override
        public void onComplete() {
          // 実行しているスレッド名の取得
          String threadName = Thread.currentThread().getName();
          System.out.println(threadName + ": 完了しました");
        }
        
        // エラーを通知された際の処理
        @Override
        public void onError(Throwable error) {
          error.printStackTrace();
        }
      });
    
  // しばらく待つ
  Thread.sleep(500L);
}
実行結果
RxComputationThreadPool-1: Hello, World!
RxComputationThreadPool-1: こんにちは、世界!
RxComputationThreadPool-1: 完了しました

 それでは、サンプルでは何をやっているのか見てみましょう。まずはデータを通知するFlowableについて見ていきます。

Flowable

 Flowableが行っている処理は次のようになっています。

// あいさつの言葉を通知するFlowableの生成
Flowable<String> flowable =
    Flowable.create(new FlowableOnSubscribe<String>() { // ……(1)
      
      @Override
      public void subscribe(FlowableEmitter<String> emitter)
          throws Exception {  // ……(2)
        
        String[] datas = { "Hello, World!", "こんにちは、世界!" };
        
        for (String data : datas) {
          // 購読が解除された場合は処理をやめる……(3)
          if (emitter.isCancelled()) {
            return;
          }
          
          // データを通知する……(4)
          emitter.onNext(data);
        }
        
        // 完了したことを通知する……(5)
        emitter.onComplete();
      }
    }, BackpressureStrategy.BUFFER);  // 超過したデータはバッファする……(6)

 (1)を見て分かるように、実際にデータを通知する処理はcreateメソッドの引数であるFlowableOnSubscribeインターフェースのsubscribeメソッドで行われています。そしてこのsubscribeメソッドの引数であるFlowableEmitterインターフェースを通じてSubscriberに通知を行っています。

 このFlowableEmitterは通知メソッド(onNextメソッド、onErrorメソッド、onCompleteメソッド)の内部で購読が解除されたか確認するようになっています。そのため、RxJava 1.xの通知メソッドと異なり、購読が解除されている状態でFlowableEmitterのonNextメソッドなどの通知メソッドを呼んでも、Subscriberには通知が行われないようになっています。

 また、FlowableOnSubscribeは関数型インターフェースであるため、ラムダ式を使って次のように記述することも可能です。

// あいさつの言葉を通知するFlowableの生成
Flowable<String> flowable = Flowable.create(emitter -> {
   String[] datas = { "Hello, World!", "こんにちは、世界!" };
  …… 
}, BackpressureStrategy.BUFFER);

 さらに(2)を見てみると、subscribeメソッドがRxJava 1.xと異なりExceptionをthrowするようになっています。このことにより処理中に例外をcatchする必要がなくなり、発生した例外はFlowable内部の呼び出し元でcatchされ、致命的なエラーでない限りはSubscriberにエラーの通知を行うようになっています。

FlowableCreate.javaの例外処理のための実装部分
public final class FlowableCreate<T> extends Flowable<T> {
  
  final FlowableOnSubscribe<T> source;
  
  …… 
  
  // subscribeメソッド内で呼ばれるメソッド
  @Override
  public void subscribeActual(Subscriber<? super T> subscriber) {
    BaseEmitter emitter;
    …… 
    try {
      source.subscribe(emitter);  // 実装したcreateメソッド内の処理を行う
    } catch (Throwable ex) {
      Exceptions.throwIfFatal(ex);
      emitter.onError(ex);
    }
    …… 
  }
  …… 
}

 一応、createメソッド内でエラーの通知を明示的に行わなくても、エラーが発生した場合はSubscriberに通知されますが、FlowableEmitterのonErrorメソッドを使って明示的にエラーを通知することも可能です。ただし、その場合は速やかに処理をやめ、エラー通知後に何も他の通知が行われないように注意しないといけません。

 (3)では、購読が解除された場合はそれ以上処理を行わないようにしています。RxJava 2.xでは購読が解除された場合、onNextメソッドなどの通知メソッドを呼び出しても、通知を行わないようになっていますが、createメソッドを使って実装している場合、購読が解除された後もFlowableの処理を続けるかどうかは実装者の責任になります。つまり、RxJavaがしてくれるのは通知メソッドを呼び出しても通知を行わないことだけで、処理自体を止めてくれるわけではありません。そのため、createメソッドを使って大量のデータをループしながら通知するFlowableを生成した場合や、完了することなく永遠にデータを通知するようなFlowableを生成した場合、購読解除された際の処理を自分で実装していないと処理が実行され続け無駄にリソースを費やすことになります。

 続いて(4)では、FlowableEmitterのonNextメソッドに引数にデータを渡すことで、そのデータをSubscriberに通知しています。ただし、この時に注意すべき点として、RxJava 1.xと異なりRxJava 2.xからはnullが通知できなくなったことがあります。もしonNextメソッドの引数にnullを渡した場合、NullPointerExceptionが発生します。

 (5)では、すべてのデータを通知した後にFlowableEmitterのonCompleteメソッドを呼ぶことでSubscriberにFlowableの処理が完了したことを通知しています。このメソッドも購読が解除されている場合は通知が行われないように挙動がRxJava 1.xから変更されており、さらにメソッド名もRxJava 1.xから変更され最後の「d」がなくなっています。そして、完了の通知をした後は他の通知をすべきではないので、onCompleteメソッドを呼んだ後は何もしていないこともポイントの一つになります。

 最後に(6)ではFlowableのcreateメソッドの第2引数にどのようなバックプレッシャーを行うのかのオプションを指定する列挙型のBackpressureStrategyを指定します。このBackpressureStrategyは2.0.0-RC4までBackpressureModeと呼ばれていたもので、2.0.0-RC5よりBackpressureStrategyにマージされました。今回は生成されたデータを通知するまですべて貯めておく(バッファする)BackpressureStrategy.BUFFERを設定しています。このモードは、Flowableによるデータの生成スピードがSubscriberによるデータの処理スピードより速い場合、通知できずに通知待ちとなったデータをすべてバッファし、通知できるようになるまで保持するモードです。

 BackpressureStrategyは、この他にも通知待ちのデータを破棄するBackpressureStrategy.DROPや、すべてではなく最新のデータのみ保持するBackpressureStrategy.LATESTなどがあります。また、RxJavaでは内部で通知待ちのデータをバッファするようになっており、そのバッファサイズを調整して何件までデータを保持するのかを決めています。そのため、どのデータが通知され、どのデータが破棄されるのかはこのバッファサイズが影響することになります。

BackpressureStrategy
BackpressureStrategy 説明
BUFFER 通知されるまで、すべてのデータをバッファする。
DROP データを通知できるようになるまで、新たに生成されたデータを破棄する。
LATEST 生成した最新のデータのみをバッファし、生成されるたびにバッファするデータを置き換える。
ERROR 通知待ちのデータがバッファサイズを超す場合はMissingBackpressureExceptionのエラーを通知する。
MISSING 特定の処理を行わない。主にonBackpressureで始まるメソッドを使ってバックプレッシャーのモードを設定する場合に使われる。

 また、今回のサンプルでは使わなかったのですが、FlowableEmitterには購読が解除された際に指定した処理を行うsetCancellableメソッドや、他のDisposableを設定してまとめて購読解除するためのsetDisposableメソッドもあります。

Subscriber

 RxJava 2.xのSubscriberは、通知されたデータを受け取り、そのデータを使って何らかの処理を行うインターフェースです。さらにRxJava 2.xのSubscriberは受け取れるデータ数をリクエストする責任も持っており、このデータ数のリクエストを行わないとFlowableはデータを通知することはできません。これはReactive Streamsの仕様であり、そのため、このインターフェースはRxJavaのものではなくReactive Streamsのものになります。そして、このSubscriberのパッケージはorg.reactivestreamsです。

 今回のサンプルでは、前のRxJava 1.xを使ったサンプルと同様に、通知を受け取ったら実行されるスレッド名とともに標準出力していますが、さらにSubscriberが1件のデータの処理を終えるまで次のデータをSubscriberに通知しないようにFlowableの通知の制御(バックプレッシャー)を行っています。このときは通知するデータ数のリクエストはSubscriptionインターフェース経由で行います。そして、このSubscriptionもReactive Streamsのものであり、パッケージはorg.reactivestreamsになります。

Subscriber
new Subscriber<String>() {
  
  /** データ数のリクエストおよび購読の解除を行うオブジェクト */
  private Subscription subscription;
  
  // 購読が開始された際の処理
  @Override
  public void onSubscribe(Subscription subscription) { // ……(1)
    // SubscriptionをSubscriber内で保持する ……(2)
    this.subscription = subscription;
    // 受け取るデータ数をリクエストする ……(3)
    this.subscription.request(1L);
  }
  
  // データを受け取った際の処理
  @Override
  public void onNext(String item) {
    …… 略(受け取ったデータを使った処理をする)
    
    // 次に受け取るデータ数をリクエストする ……(4)
    this.subscription.request(1L);
  }
  
  …… 
}

 (1)を見て分かるように、RxJava 2.xで使われているSubscriberは、RxJava 1.xのObserverと異なり新たにonSubscribeメソッドが追加されています。このメソッドは購読が開始すると最初に呼ばれるメソッドであり、引数からSubscriptionを受け取ります。このSubscriptionは通知するデータ数のリクエストおよび購読の解除を行うReactive Streamsのインターフェースです。

 (2)では、onSubscribeメソッドの引数であるSubscriptionをSubscriber内で保持するために、受け取った引数をSubscriberのインスタンス変数に設定しています。こうすることでonSubscribeメソッドが終了してもSubscriptionを使えるようにしています。

 そして(3)で、Subscriptionのrequestメソッドを使って最初に通知されるデータ数をリクエストすることで、初めてデータの通知が開始されます。今回は最初にデータを1件だけ通知するようにリクエストしています。もし、このonSubscribeメソッド内でデータ数のリクエストを行っていないと、Flowableが通知するデータ数のリクエストが来るのを待っている状態になり、通知を始めることができなくなるので注意が必要です。

 また、Subscriptionのrequestメソッドの引数には1以上の数値しか受け付けず、Reactive Streamsの仕様と異なり、0以下の数値を与えてもログの出力だけをして、このリクエストを無視します。さらに、引数の上限はLong.MAX_VALUEになっており、Long.MAX_VALUEを設定すると無制限にデータを受け取るように挙動が変わります。つまり、Long.MAX_VALUEを渡すとデータ数の制限がなくなり、今後データ数のリクエストがなくてもデータを受け取り続けることになります。

 

 さらに、ここで重要なのが、requestメソッドの呼び出しをonSubscribeメソッドの最後で行っていることです。Flowableによってはrequestメソッドの呼び出しと同時にデータの通知を始めるものもあります。そのようなFlowableの場合、reuqestメソッド以降に記述してあるコードが、完了の通知を行うまで実行されなくなる可能性があります。そのためonSubscribeメソッド内でrequestメソッドを呼ぶ際は、最後に行うようにしてください。

 (4)では、onNextメソッドでデータを受け取った際の処理を行い、最後に次のデータを1件だけ通知するようにリクエストしています。そうすることで、次にデータを受け取ってもまた通知するデータ数をリクエストするので、生産者がすべてのデータを通知するまで繰り返しデータを受け取ることができるようになります。もし、次のデータ数をリクエストしていないと、最初にリクエストした分のデータを通知した後、それ以降データが通知されなくなるので注意が必要です。

 バックプレッシャーを効かせたデータの通知はこのようになります。ただし、データ数をリクエストし、リクエストした分のデータを処理した後に再度データ数をリクエストするようなことを繰り返し行うと、何らかの問題が発生してデータ数のリクエストがされなくなり、消費者がデータを受け取れる状況にもかかわらず、生産者がデータの通知待ち状態になってしまうリスクがあります。また、生産者と消費者での処理スピードのギャップがあまりない場合や通知するデータ数が多くない場合など、特に通知するデータ数を制限する必要がない場合も多くあります。このような場合、onSubscribeメソッド内で実行するrequestメソッドにLong.MAX_VALUEを指定することで、生産者はデータ数の制限なくデータを通知できるようになり、このようなリスクを減らすことができることになります。

データ数での制限をさせない場合
new Subscriber<String>() {
  
  …… 
  
  @Override
  public void onSubscribe(Subscription subscription) {
    …… 
    // データ数を制限することなくデータを通知するようにリクエストする
    subscription.request(Long.MAX_VALUE);
  }

  @Override
  public void onNext(String item) {
    受け取ったデータの処理のみ行い、データ数のリクエストは行わない
  }
  
  …… 
}

 また、このサンプルでは取り扱いませんでしたが、onSubscribeメソッドで受け取るSubscriptionは購読を解除する機能も持っています。もしSubscriberが購読を途中で解除する必要がある場合、Subscriptionのcancelメソッドを呼ぶことで通知処理を終わらせるように促すことができるようになります。

 さらにRxJavaでは、onSubscribeメソッドでLong.MAX_VALUEをリクエストすることを内部で行うResourceSubscriberクラスが用意されています。このResourceSubscriberは抽象クラスであり、基本的にはRxJava 1.xのObserverと同様に次のメソッドのみ実装すればよくなり、データ数のリクエストを忘れることを防げます。

  • onNext(T data)
  • onError(Throwable error)
  • onComplete()

 また、ResourceSubscriberではonSubscribeメソッドの実装がfinalで定義されていて、受け取ったSubscriptionが隠蔽されています。そのため、ResourceSubscriberを使う場合は直接Subscriptionにアクセスすることはできないようになっています。

 そのため、onSubscribeメソッドをオーバーライドすることは不可能なのですが、onSubscribeメソッド内から呼ばれているResourceSubscriberのonStartメソッドをオーバーライドすることで初期時のリクエストを行うことができます。ResourceSubscriberでは次のように実装されています。

ResourceSubscriberのonStartメソッドの実装
protected void onStart() {
    request(Long.MAX_VALUE);
}

 このonStartメソッドで呼ばれているrequestメソッドは、ResourceSubscriberの内部でSubscriptionを経由してデータ数をリクエストできるようにするメソッドで、ResourceSubscriberを使う場合は、このrequestメソッドを呼び出してデータ数のリクエストを行うことができるようになります。

 また、onStartメソッドでもonSubscribeメソッドと同様にrequestメソッドの呼び出しと同時にデータの通知を始め、reuqestメソッド以降に記述してあるコードが、完了の通知を行うまで実行されなくなる可能性があるため、onStartメソッド内で最後にrequestメソッドを呼ぶようにしてください。

 さらにResourceSubscriberはDisposableを実装しているため、購読を解除するdisposeメソッドも実装しています。このdisposeメソッドは内部でSubscriptionのcancelメソッドを呼んでいるので、Subscriptionに直接アクセスできなくてもdisposeメソッド経由で購読の解除を行うことを可能にしています。

Subscriptionの機能を呼び出すメソッド
メソッド 説明
request(long) 通知するデータ数をリクエストする。
dispose() 購読を解除する。Subscriptionのcancelメソッドが呼ばれる。

 また、RxJavaではResourceSubscriberと同じようなSubscriberとしてDisposableSubscriberが用意されています。このDisposableSubscriberもonSubscribeメソッドをfinalでオーバーライドして、データ数をLong.MAX_VALUEでリクエストし、Subscriptionを隠蔽しています。このDisposableSubscriberの使い方はResourceSubscriberとほぼ同じになります。今回は特に解説しませんが、両者の主な違いはResourceSubscriberが他のDisposableを保持することができるのに対し、DisposableSubscriberは他のDisposableを保持できない点にあります。

subscribeメソッド

 RxJava 2.xのSubscriberを引数に受け取るsubscribeメソッドは、戻り値を返さないようにRxJava 1.xから変更されています。これはReactive Streamsの仕様に沿ったもので、このsubscribeメソッドを呼んだ場合は、Subscriberの内部で購読の解除を行うような設計になっています。

 例えば先ほどのサンプルを使って、購読を開始してから500ミリ秒以上経過していたら購読を解除するような処理を行うようにしてみましょう。その場合、次のようにSubscriberのonNextメソッドでdisposeメソッドを呼びます。

途中で購読を解除するサンプル
public static void main(String[] args) throws Exception {
  
  // あいさつの言葉を通知するFlowableの生成
  Flowable<String> flowable = Flowable.create(emitter -> {
    …… 
  }, BackpressureStrategy.BUFFER); // 超過したデータはバッファする
  
  flowable
      // Subscriberの処理を別スレッドで行うようにする
      .observeOn(Schedulers.computation())
      // 購読する
      .subscribe(new ResourceSubscriber<String>() {
        
        /** 購読の開始時間 */
        private long startTime;
        
        // 購読が開始された際の処理
        @Override
        protected void onStart() {
          // 購読の開始時間を取得
          startTime = System.currentTimeMillis();
          // データ数のリクエスト
          request(Long.MAX_VALUE);
        }
        
        // データを受け取った際の処理
        @Override
        public void onNext(String data) {
          // 購読開始から500ミリ秒を過ぎた場合は購読を解除する
          if ((System.currentTimeMillis() - startTime) > 500L) {
            dispose();  //  購読を解除する
            System.out.println("購読解除しました");
            return;
          }
          
          // 重い処理をしているとみなして1000ミリ秒待機 
          // ※ 本来はThread.sleepは呼ぶべきではない
          try {
            Thread.sleep(1000L);
          } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(1);
          }
          
          // 実行しているThread名の取得
          String threadName = Thread.currentThread().getName();
          // 受け取ったデータを出力する
          System.out.println(threadName + ": " + data);
        }
        
        …… 
      });
  
  // しばらく待つ
  Thread.sleep(1500L);
}

 このサンプルでは生産者と消費者の処理スピードのギャップを作るため、SubscriberのonNextメソッドでスレッドをいったん停止しています。この箇所はそこで何かの重い処理をしていると考えてください。これを実行すると次のような結果が出力されます。

実行結果
RxComputationThreadPool-1: Hello, World!
購読解除しました

 ※非同期処理のため実行するたびに結果は変わる可能性があります。

 この時のSubscriberのonNextメソッドの処理の流れは次のようになっています。

  1. データ「Hello, World!」を受け取る。
  2. 購読開始から500ミリ秒より時間が経過しているかチェックする。
  3. 経過していないので1000ミリ秒待機し受け取ったデータを出力する。
  4. データ「こんにちは、世界!」を受け取る。
  5. 購読開始から500ミリ秒より時間が経過しているかチェックする。
  6. 経過しているので購読を解除し、onNextメソッドの処理を止める。
  7. 購読が解除されているので完了の通知が来なくなる。

 ここで特に認識しておくべき点は、500ミリ秒を経過したら購読を解除しているのではなく、onNextメソッドが呼ばれたタイミングで500ミリ秒を経過したかの判断をしている点です。つまり、500ミリ秒をすぎていてもonNextメソッドが呼ばれなければ購読が解除されることはありません。

 また、subscribeメソッドにはこのReactive Streamsの仕様に沿ったものの他に、Disposableを戻り値に返す関数型インターフェースを引数に取るsubscribeメソッドも用意しています。この引数となる関数型インターフェースには各通知時の処理が定義されています。そして、このsubscribeメソッドを使う場合は、戻り値のDisposableを使ってSubscriberの外から購読を解除することが可能になります。

Disposableを使って購読を解除する例
…… 

// 購読を開始する……(1)
Disposable disposable =
    flowable.subscribe(data -> System.out.println("data=" + data));

…… 

// 購読を解除する……(2)
disposable.dispose();

…… 
  1. 関数型インターフェースを引数に取るsubscribeメソッドを使い、戻り値としてDisposableを取得する。
  2. Disposableのdisposeメソッドを呼び出し購読を解除する。

 このようにRxJavaでは、引数なしや引数が関数型インターフェースのsubscribeメソッドが用意されており、Disposableを返すものが用意されています。このようなsubscribeメソッドは次のものになります。

Disposableを返すsubscribeメソッド
戻り値の型 メソッド 説明
Disposable subscribe() Flowableの処理だけ行いSubscriberは何もしない。
Disposable subscribe(Consumer<? super T> onNext) データの通知(onNext)を受け取った時の処理のみ引数で定義してあるように行う。
Disposable subscribe(Consumer<? super T>onNext, Consumer<? super Throwable> onError) データの通知(onNext)とエラーの通知(onError)を受け取った時の処理のみ引数で定義してあるように行う。
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) データの通知(onNext)とエラーの通知(onError)と完了の通知(onComplete)を受け取った時の処理のみ引数で定義してあるように行う。
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe) データの通知(onNext)とエラーの通知(onError)と完了の通知(onComplete)を受け取った時の処理を引数で定義してあるように行い、さらに購読開始時の処理(onSubscribe)も定義してあるように行う。

 これらのsubscribeメソッドでは、デフォルトでリクエストするデータ数にはLong.MAX_VALUEが設定されています(引数にonSubscribe時の関数型インターフェースを取るものを除く)。そのためFlowableの通知に対しデータ数の制限はなくなり、再度データ数をリクエストする必要はなくなります。

 さらにRxJava 2.xではこれらのsubscribeメソッドに加え、新たに購読を行うためのメソッドとして、Subscriberを引数に取り、戻り値も返すsubscribeWithメソッドを用意しています。このメソッドの実装は次のようになっており、引数にSubscriberを渡すと内部でそのSubscriberをsubscribeメソッドに渡して実行し、戻り値としてその引数を返すようになっています。

subscriberWithメソッドの実装
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
  subscribe(subscriber);
  return subscriber;
}

 これは一見Subscriberを返してどうするのかと思うかもしれませんが、引数にResourceSubscriberやDisposableSubscriberなどのDisposableを実装しているSubscribeを渡すことで戻り値としてDisposableを受け取ることができます。

subscriberWithメソッドを使ってDisposableを戻り値を取得する例
Disposable disposable =
    flowable.subscribeWith(new ResourceSubscriber(){
      …… 
    });

 こうすることでRxJava 1.xのように購読後に購読解除が行えるDisposableを取得し、Subscriberの外部から購読の解除ができるようになります。また、外部からdisoposeメソッドが呼ばれるということは非同期で呼ばれる可能性もあるということです。ResourceSubscriberやDisposableSubscriberなどのRxJavaで実装されたSubscriberのクラスは、非同期で呼ばれることにも対応しているので、subscribeWithメソッドを使う際は、これらのRxJava側で実装されたクラスを使う方が良いでしょう。

observeOnメソッド

 RxJavaでは、データを通知する側と受け取る側とで処理を別々のスレッドで実行させる場合、observerOnメソッドの引数にSchedulerというスレッド管理を行うオブジェクトを設定することで、どのようなスレッド上で処理を行うのかを指定します。また、observeOnメソッドにはSchedulerに加えて他の引数を取るものがあり、元になるのは次のobserveOnメソッドになります。

observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
observeOnメソッドの引数
引数No. 引数の型 説明
第1引数 Scheduler スレッド管理を行うクラス。
第2引数 boolean trueの場合はエラーが発生しても、そのことをすぐには通知せず、バッファしているデータを通知し終えてからエラーを通知する。falseの場合は発生したらすぐにエラーを通知する。デフォルトはfalse。
第3引数 int 通知待ちのデータをバッファするサイズ。デフォルトでは128。

 observeOnメソッドには、この第2引数と第3引数を省略してあるメソッドも用意してあり、その場合は各デフォルト値が設定された挙動になります。

 通知待ちのデータを扱う際に特に重要になるのが第3引数で、消費者に通知されるデータは、このバッファされた通知待ちのデータから取得されることになります。実際には、ここで指定した数値がFlowableに対してデータ数のリクエストを自動で行うようになっており、そのリクエストを受けて送られたデータがバッファされることになります。つまり「1」を指定すると、内部でrequest(1)が実行されていることになります。

 例えば、次のサンプルでは100ミリ秒ごとに0から始まる数値を通知するFlowableに対し、別スレッド上でデータを受け取るSubscriberが処理を300ミリ秒待ってから受け取ったデータを出力するようにしています。そのため、Flowableの処理がSubscriberの処理より早いので、通知待ちのデータが発生することになります。そこで、このサンプルでは通知待ちのデータを破棄するように設定しています。

public static void main(String[] args) throws Exception {
  
  Flowable<Long> flowable =
      // 100ミリ秒ごとに0から始まるデータを通知するFlowableを生成……(1)
      Flowable.interval(100L, TimeUnit.MILLISECONDS)
          // BackpressureMode.DROPを設定した時と同じ挙動にする……(2)
          .onBackpressureDrop();
  
  flowable
      // 非同期でデータを受け取るようにし、バッファサイズを1にする……(3)
      .observeOn(Schedulers.computation(), false, 1)
      // 購読する
      .subscribe(new DisposableSubscriber<Long>() {
        
        // データを受け取った際の処理
        @Override
        public void onNext(Long item) {
          // 300ミリ秒待つ……(4)
          try {
            Thread.sleep(300L);
          } catch (InterruptedException e) {
            e.printStackTrace();
            // 異常終了で終わる
            System.exit(1);
          }
          
          // 実行しているThread名の取得
          String threadName = Thread.currentThread().getName();
          // 受け取ったデータを出力する
          System.out.println(threadName + ": " + item);
        }
        
        …… 
      });
  
  // しばらく待つ
  Thread.sleep(2000L);
}
  1. intervalメソッドは100ミリ秒ごとに0から始まるデータを通知するFlowableを生成するメソッド。指定した時間ごとのに「0」「1」「2」……と順に数値を通知するFlowableが生成される。
  2. バッファサイズを超えて生成されたデータを破棄するようなバックプレッシャーのモードを設定する。
  3. 消費者側の処理を生産者側とは異なるスレッドで実行するように設定し、バッファするサイズを「1」にする。
  4. 重い処理をしていると見なし、300ミリ秒ほどスレッドを止める。※本来はThreadのsleepメソッドをSubscriber内で使うべきではない。
実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 12
RxComputationThreadPool-1: 16

 ※非同期処理のため実行するたびに結果は変わる可能性があります。

 このように通知されるまでに生成されたデータは破棄されていることが分かります。

 ところが次のように、このバッファサイズを「2」に変更すると、

flowable
  .observeOn(Schedulers.computation(), false, 2)

 次の結果になります。

実行結果
RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 7
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 14

 ※非同期処理のため実行するたびに結果は変わる可能性があります。

 この結果より、リクエストするまでに時間があいているにも関わらず、「0」「1」と続けてデータが通知されていることから、リクエストした後に通知されるデータはキャッシュされたデータであり、キャッシュサイズを超えて通知待ちになっているデータが破棄されていることが分かります。

Observable(バックプレッシャー機能なし)を使った場合のサンプル

 RxJava 2.xのObservableとObserverの関係はFlowableとSubscriberの関係とほぼ同じですが、Reactive Streamsを実装しておらずバックプレッシャー機能がない点が異なっています。そして、クラスの構成や用意されているメソッドなどもバックプレッシャーに関わる機能を除いてFlowableとほぼ同じようになっているので、Flowableの使い方を知っていればObservableも同様に使うことができるようになります。それではObservableを使ったサンプルを見てみましょう。

 ここでのサンプルもFlowableのサンプルと同様に「Hello, World!」と「こんにちは、世界!」のデータを通知し、最後に完了したことを通知しています。そして通知を受け取るObserverはその通知の内容を出力するようにします。

Observableを使った場合
public static void main(String[] args) throws Exception {
  
  // あいさつの言葉を通知するObservableの生成
  Observable<String> observable =
      Observable.create(new ObservableOnSubscribe<String>() {
        
        @Override
        public void subscribe(ObservableEmitter<String> emitter)
            throws Exception {
          
          // 通知するデータ
          String[] datas = { "Hello, World!", "こんにちは、世界!" };
          
          for (String data : datas) {
            // 解除されている場合は処理をやめる
            if (emitter.isDisposed()) {
              return;
            }
          
            // データを通知する
            emitter.onNext(data);
          }
          
          // 完了したことを通知する
          emitter.onComplete();
        }
      });
  
  observable
      // 消費する側の処理を別スレッドで行うようにする
      .observeOn(Schedulers.computation())
      // 購読する
      .subscribe(new Observer<String>() {
        
        // subscribeメソッドが呼ばれた際の処理
        @Override
        public void onSubscribe(Disposable disposable) {
          // 何もしない
        }
        
        // データを受け取った際の処理
        @Override
        public void onNext(String item) {
          // 実行しているThread名の取得
          String threadName = Thread.currentThread().getName();
          // 受け取ったデータを出力する
          System.out.println(threadName + ": " + item);
        }
        
        // 完了を通知された際の処理
        @Override
        public void onComplete() {
          // 実行しているThread名の取得
          String threadName = Thread.currentThread().getName();
          System.out.println(threadName + ": 完了しました");
        }
        
        // エラーを通知された際の処理
        @Override
        public void onError(Throwable error) {
          error.printStackTrace();
        }
      });
  
  // しばらく待つ
  Thread.sleep(500L);
}
実行結果
RxComputationThreadPool-1: Hello, World!
RxComputationThreadPool-1: こんにちは、世界!
RxComputationThreadPool-1: 完了しました

 それでは、サンプルでは何をやっているのか見てみましょう。まずはデータを通知するObservableについて見ていきます。

Observable

 Observableが行っている処理は次のようになっています。

Observable.create(new ObservableOnSubscribe<String>() {  // ……(1)
  
  @Override
  public void subscribe(ObservableEmitter<String> emitter)  // ……(2)
      throws Exception {
    
    // 通知するデータ
    String[] datas = { "Hello, World!", "こんにちは、世界!" };
    
    for (String data : datas) {
      // 解除されている場合は処理をやめる
      if (emitter.isDisposed()) {  // ……(3)
        return;
      }
    
      // データを通知する
      emitter.onNext(data);
    }
    
    // 完了したことを通知する
    emitter.onComplete();
  }
}); // ……(4)

 このObservableとFlowableの構成上の違いはほぼなく、(1)でFlowableOnSubscribeの代わりにObservableOnSubscribeを受け取るようになっています。

 それに伴い(2)で見て分かるように、subscribeメソッドの引数でFlowableEmitterの代わりにObservableEmitterを受け取るようになっています。

 また(3)では、購読が解除されたのかどうかを確認するメソッドはFlowableEmitterの場合はisCanceledメソッドだったのに対し、ObservableEmitterの場合はisDisposedメソッドになり名前が異なっている点に注意してください。ただし、処理内容は両方とも購読の解除を行います。

 さらに(4)では、バックプレッシャーがなくなったのでcreateメソッドの引数は1つになり、Flowableのcreateメソッドのようにバックプレッシャーのオプションを指定する第2引数がなくなった点が異っています。そしてバックプレッシャーがないということは、データが生産されるたびにデータが通知されることになります。

 このようにFlowableとの違いはいくつかありますが、ObservableEmitterの実装クラスはFlowableEmitterと同様に購読が解除されていたら通知を行わないようになっていたり、nullを通知するとNullPointerExceptionを発生させたりと、バックプレッシャーの機能がないことを除けばFlowableの場合とほぼ同じになります。

Observer

 Observerは通知を受け取り、その通知に対する処理を行います。ObserverはRxJava 1.xからありますが、RxJava 2.xではonSubscribeメソッドが追加されています。このメソッドはFlowableを購読するSubscriberのonSubscribeメソッド同様に購読が開始する際に呼ばれるメソッドですが、ObservableとObserver間ではバックプレッシャーの機能はなくなっているので引数にSubscriptionではなくDisposableを受け取るようになっています。それではObserverの中身を見ていきましょう。

new Observer<String>() {
  
  // 購読が開始された際の処理
  @Override
  public void onSubscribe(Disposable disposable) {
    // 何もしない……(1)
  }
  
  // データを受け取った際の処理
  @Override
  public void onNext(String item) {
    // 実行しているThread名の取得
    String threadName = Thread.currentThread().getName();
    // 受け取ったデータを出力する
    System.out.println(threadName + ": " + item);
    
    // 通知するデータ数のリクエストは行わない……(2)
  }
  
  …… 
}

 (1)を見て分かるように、このサンプルでは購読開始時に処理を行うonSubscribeメソッドでDisposableを受け取っても、特に何もしていません。もし、購読の途中で購読を解除する必要がある場合は受け取ったDisposableをObserver内で保持しないといけませんが、今回のサンプルでは購読解除しないので何もしていません。

 さらに(2)を見て分かるように、バックプレッシャー機能がないのでデータを受け取った際の処理のみ行っています。仮に何らかのタイミングで購読を解除しないといけない場合に(1)のonSubscribeメソッドで受け取ったDisposableをObserver内で保持し、そのDisposableを使ってonNextメソッド内で購読を解除するためのdisposeメソッドを呼ぶことになります。

 また、Subscriberと同様にObserverにもResourceObserverとDisposableObserverが用意されており、基本的には次のメソッドのみ実装すればよくなります。

  1. onNext(T data)
  2. onError(Throwable error)
  3. onComplete()

 さらにResourceObserverとDisposableObserverも同様にonSubscribeメソッドがfinalでオーバーライドされており、直接Disposableにアクセスできないようになっています。代わりに購読を解除する際の次のメソッドが用意されています。

Disposableの機能を呼び出すメソッド
メソッド 説明
dispose() 購読を解除する。
isDisposed() 購読が解除されていればtrueを返す。

 そして購読開始時の処理としてonStartメソッドが用意されており、購読開始時に何か行う場合はこのメソッドをオーバーライドします。デフォルトでは次のように何も処理を行わないメソッドになっています。

ResourceObserver/DisposableObserverのonStartメソッドの実装
protected void onStart() { }

subscribeメソッド

 RxJava 2.xのObserverを引数に受け取るsubscribeメソッドは戻り値を返さないようになっています。そのため、このsubscribeメソッドを呼んだ場合はObserverの内部で購読の解除を行うような設計になっています。

 ただし、Flowableの場合と同様に、各通知時の関数型インターフェースを受け取るsubscribeメソッドや戻り値を返すsubscribeWithメソッドが用意されており、このようなsubscribeメソッドの場合は戻り値にDisposableを返します。そして、この戻り値のDisposableを使って、Observerの外から購読を解除することが可能になります。

まとめ

 今回はRxJava 2.xを使った簡単な実装を行い、RxJavaの基本的な仕組みを実装ベースで見てきました。次回はRxJavaについて何ができるのかについて、もう少し踏み込んで見ていきます。