最終更新日:2020/09/02 原本2016-10-11

RxJava 2.xで導入されたReactive Streams

RxJavaによるリアクティブプログラミング入門(3)

 この連載はRxJavaを使って、リアクティブプログラミングにおけるポイントやRxJavaが持つ機能について学んでいくことを目的としています。前回は現在の安定板であるRxJava 1.xで実際に簡単なサンプルを実装してみましたが、今回は次期バージョンとなるRxJava 2.xを使って実装するためのポイントについて見ていきます。特に、2.xになって対応することになったReactive Streamsとは何か、バージョンが上がったことによって主にRxJavaの何が変わったのかについて見ていきます。

対象読者

  • Java経験者(初心者可)
  • RxJava未経験者
  • リアクティブプログラミング未経験者

 ※ただし、前回までの連載を読んでいる前提です。

2.xへのバージョンアップについて

 RxJavaのバージョンが1.xから2.xに上がった背景にReactive Streamsの適用があります。Reactive Streamsとはどのライブラリやフレームワークを使っているのかに関係なく、データストリームを非同期で扱えるようにするための共通の仕組みを提供するもので、そのためのインタフェースを提供しています。

 ちなみにReactive Streamsが提供するのはインタフェースのみで、実装は各ライブラリやフレームワークに委ねられます。

 RxJavaの今回のバージョンアップでは、このReactive Streamsの実装とともに、まったく新しく実装しなおすことでパフォーマンスの改善も図っています。基本的にはReactive Streams自体がRxJavaの影響を強く受けているので、根本的な仕組み(データストリームの扱いや生産者と消費者の関係など)には大きな違いはありませんが、後で説明するバックプレッシャーの仕様に基づき、それに関するAPIに対しての差異が入っています。そのため、1.xから2.xに移行する際に単にパッケージ名やクラス名を変えるだけではなく、それらのAPI周りの変更も必要になります。また、RxJava内部の作り自体はスクラッチから始めたと公式に述べられているように、今までとは別物と思った方がよいでしょう。

 さらに、前のバージョンである1.xでは他のライブラリに対しての依存はありませんでしたが、今回はReactive Streamsを実装しているため、Reactive Streamsのjarが必須になります。

 また、RxJava 1.xとRxJava 2.xではルートとなるパッケージが完全に異なっています。

バージョン パッケージ
1.x rx
2.x io.reactivex

 そのため、RxJavaの異なる2つのバージョンを同じプロジェクト内に混在させることは可能ですが、そのプロジェクトがRxJavaに依存しているライブラリーを使っている場合、そのライブラリーがどのように対応しているかによって混在するのが難しい場合もあります。また、異なるバージョンを混在させることが可能でありRxJava 1.xもしばらくは公式にサポートされるようになっていても、運用や保守を考えると、混在させるのはRxJava 2.xへの移行時など一時的な場合に限定したほうがよいでしょう。また異なるバージョンのクラスを同じソース上に混ぜ合わせたりするのも後で管理が大変になり、クラス名の衝突も発生したりするので避けた方がよいです。

 また、「RxJava」の表記について今後の連載では、「RxJava」とだけ表記している場合は基本的にバージョン2.xについて述べていることにします。もしバージョンの違いについて明確にする場合は、例えばバージョン1.xについて述べる場合は「RxJava 1.x」とバージョンを表記するようにしています。

Reactve Streamsとは

 Reactive Streamsとは、前述したようにリアクティブプログラミングを行うための共通のインタフェースを提供するものです。Reactive Streamsは、このインタフェースを介して次の特徴を持つ仕組みを提供します。

  1. データストリーム(Data Stream)
  2. シーケンシャル(逐次的)な通知
  3. 非同期なデータの受け渡し
  4. バックプレッシャー(Backpressure)

 1から3に関しては、すでに前回までの連載で解説したRxJava 1.xにもあった機能なので今回は説明を省きますが、バックプレッシャーについては新しいコンセプトなので詳しく見ていきましょう。ちなみにバックプレッシャーの機能はRxJava 1.xにも含まれています。

バックプレッシャー(Backpressure)

 バックプレッシャーについて知るためには、まずデータを通知する生産者と受け取ったデータを処理する消費者がそれぞれ別のスレッド上で処理をしている状況で、生産者の処理スピードが速く、消費者の処理スピードが遅い場合を想像してください。

非同期なデータの通知
非同期なデータの通知

 この状況の場合、生産者がどんなにデータを通知しても消費者の処理が追い付かずに処理待ちのデータがどんどん貯まっていってしまいます。例えば、生産者が1秒間に100件のデータを通知するのに対し、消費者が1秒間に1件しかデータを処理できない場合、1秒後には99件のデータが残ることになり、さらに新たな100件のデータが加えられることになってしまいます。このような状況下では、時間が経つごとに処理待ちのデータが貯まっていってしまい、最新の結果を素早く受け取ることができなくなり、最終的にメモリーが足らなくなってシステムがクラッシュするなどの問題が発生する可能性があります。

 この問題を解決するのがバックプレッシャーで、生産者が通知するデータの量を抑制することで、消費者が受け取ったデータを処理しきれず大量のデータが処理待ちの状況になってしまうことを避けられるようにします。これを実現するために、消費者はデータを受け取る前に処理しきれるデータ数を生産者にリクエストし、生産者はそのデータ数分のデータを通知するようにします。

 バックプレッシャーの処理の流れは次のようになります。

  1. 消費者が指定したデータ数だけデータを通知するようにリクエストする
  2. 生産者がリクエストを受けたデータ数だけデータを通知する。
  3. 生産者はリクエスト分のデータを通知したら通知を止めるが、データの生産は続ける。
  4. 消費者が受け取った最後のデータを処理したら、再度データ数をリクエストする。
  5. リクエスト分のデータを再度通知し始める。

 このようにすべてのデータを通知し終えるまで、生産者はリクエストを受けた分だけのデータを通知し、消費者は処理しきれるデータ数をリクエストをすることを繰り返し続けます。また、購読を開始したタイミングで消費者がデータ数をリクエストすることで、生産者がデータの通知を始めることができるようになります。

バックプレッシャーのシーケンス図
バックプレッシャーのシーケンス図

 それでは生産者が生成しても通知ができていないデータはどうなるのでしょうか? これはバックプレッシャーの設定によりますが、新たなリクエストが来るまでに生成したデータを通知することなく破棄したり、次のリクエストが来るまで貯めておいたりできます。

データを破棄する場合
データを破棄する場合
データを貯める場合
データを貯める場合

 この他にも処理待ちのデータに対するバックプレッシャーのオプションはありますが、大まかに分類すると破棄するかバッファするかになります。ただし、処理されていないデータをバッファする場合、すべてのデータをバッファすると結局はバックプレッシャーがない場合と同じ問題に直面するため、バッファできるデータ数を指定しておくことが基本になります。

 RxJavaではバッファのサイズが指定でき、指定したバッファサイズを超えた場合にすぐエラーを通知することができます。そうすることで早い段階でエラーを返すことができ、通知できずバッファされているデータが過多になっていることが原因で問題が発生していることを素早く伝えることができます。また、データを破棄する場合もRxJavaでは内部のバッファサイズを超したものに対して破棄するようになっている点に注意が必要です。

 また、サンプルや図などでは分かりやすさを優先して少ないデータ数でのリクエストを行っていますが、少ないデータ数でのリクエストを頻繁にやり取りすることはオーバーヘッドになるので、実際には極端に小さい単位で頻繁にリクエストするのは避けた方がよいでしょう。

 ちなみにRxJava 1.xにもバックプレッシャーの機能は用意されています。すごく簡単に説明すると、Observerの実装クラスであるSubscriberからデータ数のリクエストを行い、Observableからリクエストしたデータ数分のデータを通知できるようになっています。

Reactive Streamsの構成

 Reactive Streamsの構成はRxJava 1.xの構成とかなり似ていて、生産者としてデータを生産し通知するPublisherと消費者としてデータを受け取り処理をするSubscriberとの関係で成り立っています。また、購読(subscribe)している生産者からのデータ数のリクエストや購読を解除するためのSubscriptionも存在し、Subscriptionを通して必要なデータ数のリクエストや購読を途中で解除することが可能になっています。

Reactive Streamsの構成

Reactive Streamsの構成

 このPublisherとSubscriberの関係を、前回説明したRxJava 1.xのObservableとObserverの関係と比べると次のようになります。

Reactive StreamsとRxJava 1.xの関係の比較
Reactive Streams RxJava 1.x 説明
Publisher Observable データを生産し通知するオブジェクト。
Subscriber Observer 受け取った通知をもとに処理を行うオブジェクト。RxJava 1.xではこのObserverを実装したSubscriberクラスが通知するデータ数をリクエストする機能も持つ。
Subscription Subscription 購読を解除することができるオブジェクトであり、Reactive Streamsでは通知するデータ数をリクエストする機能も持つオブジェクト。

 Reactive Streamsが提供するAPIの各インタフェースは次のように宣言されています。

Publisher.java
/** データを通知する生産者 */
public interface Publisher<T> {
  /** 通知を受け取るSubscriberを登録する */
  public void subscribe(Subscriber<? super T> subscriber);
}
Subscriber.java
/** データを受け取り処理をする消費者 */
public interface Subscriber<T> {
  /** 購読開始時の処理をする */
  public void onSubscribe(Subscription subscription);
  
  /** データの通知を受け取った際の処理をする */
  public void onNext(T item);
  
  /** エラーの通知を受け取った際の処理をする */
  public void onError(Throwable error);
  
  /** 完了の通知を受け取った際の処理をする */
  public void onComplete();
}
Subscription.java
/** 生産者と消費者をつなぐためのインタフェース */
public interface Subscription {
  /** 通知されるデータ数をリクエストする */
  public void request(long num);
  
  /** 購読をキャンセル(解除)する */
  public void cancel();
}

 Reactive Streamsの構成はRxJava 1.xと異なる部分もあり、まず、大きく異なる部分としてRxJava 1.xではSubscriptionがsubscribeメソッドの戻り値となっていたのに対し、Reactive Streamsでは、Subscriptionがデータの消費者であるSubscriberにonSubscribeメソッドの引数として渡されている点が違います。このSubscriberに新しく追加されたonSubscribeメソッドは、Publisherが購読された際に最初に通知されるイベントで一度だけしか呼ばれないものです。Reactive StreamsではSubscriptionがこのonSubscribeメソッド経由で渡ってきます。

 このSubscriptionはRxJava 1.xでは購読解除の機能しかありませんでしたが、Reactive Streamsではバックプレッシャーを実現するために、データ数をリクエストするrequestメソッドが追加されています。このrequestメソッドを通してSubscriberが処理できるデータ数をリクエストし、Publisherはリクエストされたデータ数分のデータをSubscriberに通知します。

 そのため、SubscriberはonSubscribeメソッド内でSubscriptionのrequestメソッドを呼んで最初に受け取るデータ数をリクエストします。そうすることで、Publsihserがデータの通知を開始できることになります。そして、Publsihserがリクエストした分のデータを通知した後は通知をやめ、次のリクエストが来るのを待ちます。ただし待っている間もデータを生産し続ける状態になっています。

 このように生産者がデータの通知や生成を行っている間、Subscriberは受け取ったデータを処理します。そして、リクエストした分のデータを処理した後に次に受け取るデータ数を再度リクエストします。これを繰り返すことで、Subscriberの処理ペースに合わせたデータの受け取りが可能になります。

 ちなみに、onNextメソッドでSubscriptionを使うには、onSubscribeメソッドで受け取ったSubscriptionをSubscriberの内部で保持しないといけません。

publisher.subscribe(new Subscriber<T>() {
  /** Subscriber内で保持するSubscription */
  private Subscription subscription;
  
  @Override
  public void onSubscribe(Subscription subscription) {
    // 受け取ったSubscriptionをSubscriber内に保持する
    this.subscription = subscription;
    
    // 最初に通知を受けるデータ数をリクエストする
    this.subscription.request(num);
    …略
  }
  
  @Override
  public void onNext(T item) {
    …略
    // リクエストした分のデータを処理したら次のデータ数をリクエストする
    subscription.request(num);
  }
  
  …略
});

 また、このsubscribeメソッドの戻り値がなくなり(voidになり)、onSubscribeメソッド経由でSubscriptionを受け取るようになったということは、RxJava 1.xでは購読解除を生産者と消費者の連携の外部から行っていたのに対し、Reactive StreamsではSubscriber内の閉じた環境で購読を解除する設計に変更されたことになります。

 まとめると、このPublisherとSubscriber間のプロトコルは次の4つになり、

  • onSubscribe
  • onNext
  • onError
  • onComplete

 正常系(すべてのデータを問題なく通知して完了した場合)の処理の流れは次のようになります。

Reactive Streamsの正常時のシーケンス図
Reactive Streamsの正常時のシーケンス図
  1. SubscriberがPublisherを購読(subscribe)する。
  2. PublisherがSubscriptionを生成し購読開始(onSubscribe)したことを通知する。
  3. Subscriberが受け取ったSubscriptionを通して最初に受け取るデータ数をリクエストする。
  4. Publisherがデータを通知する。
  5. Subscriberが受け取ったデータを処理する。
  6. Subscriberがリクエストしたデータ数分の処理を行ったら、再度通知するデータ数をリクエストする。
  7. Publsisherがすべてのデータを通知し終えたら、完了を通知する。
  8. Subscriberが完了の通知を受け取り処理をする。

 また、見逃しがちですが、完了の通知を受け取った際の処理を行うonCompleteメソッドの名前がRxJava 1.xと異なり、最後の「d」がなくなっていることに注意してください。

 その他にもReactive Streamsで名前が変わったメソッドにSubscriptionの「cancel」メソッドがあります。これはRxJava 1.xのときは「unsubscribe」と呼ばれていたメソッドで購読を解除する際に使われるメソッドです。

 その他にも、今回は説明しませんがProcessorというPublisherとSubscriberの両方を継承したインタフェースもあります。もし、Reactive Streamsについてさらに詳しく知りたい場合は、英語になりますがReactive Streamsの公式サイトやGitHubを参照してください。

Reactive Streamsのルール

 Reactive Streamsは、RxJavaから影響を受けているように多くのルールがRxJava 1.xと同じです。根本となるルールとして次のものがあります。

  • 通知はシーケンシャル(逐次的)に行われる。つまり、複数の通知を同時に行わない。
  • Publisherの処理は完了(onComplete)もしくはエラー(onError)を通知することで終了する。
  • 完了もしくはエラーを通知したらそれ以降は通知を行わない。

 ただし、RxJava 1.xとReactive Streamsとの間で異なる部分もあり、大きな違いの一つはnullが通知できなくなった点でしょう。Reactive Streamsではデータの通知やエラーの通知をする際にnullを通知しようとするとNullPointerExceptionを発生させる仕様になっています。

  • nullを通知することはできない。

 この変更はRxJava 1.xからの移行をする際に大きな影響がでる可能性があります。基本的にRxJava 1.xの場合でも対象外のデータを除外するfilterメソッドを使うなどしてnullをそのまま扱うことはあまりなかったとは思いますが、それでも生産者はfilterメソッドの前にnullを通知していることになります。そのためRxJava 1.xではObservableのオペレータなどで行っていたnullに対する除外処理を、RxJava 2.xでは生産者が通知する処理内で行うように変更しないといけなくなっています。

 さらにRxJavaでは、先のルールの一つである通知をシーケンシャルに行う点について、SubscriberのonSubscribeメソッドでrequestメソッドを呼んだタイミングでデータが通知されることもあり、SubscriberのonNextメソッドがonSubscribeメソッドの処理が終わる前に実行されることがあるので注意が必要です。

 さらに、これらに加えバックプレッシャーに関した次のルールも覚えておかなければなりません。

  • Publisherは最初にonSubscribeの通知を行う。
  • 購読したSubscriberは一度だけonSubscribeの通知を受け取る。
  • リクエストするデータ数が0以下の場合はIllegalArgumentExceptionをエラーとして通知する。
  • リクエストするデータ数がLong.MAX_VALUEの場合、通知するデータ数は無制限と見なすことができる。
  • データ数のリクエストはSubscriberのonSubscribeメソッドかonNextメソッドから呼ばれなければならない。
  • データ数のリクエストはSubscriptionが同じスレッド上もしくは同期がとられた状態で行わなければならない。

 これらの中で、リクエストするデータ数が0以下の場合に発生するIllegalArgumentExceptionはRxJavaだとSubscriberにonErrorとして通知されずに単にエラーのスタックトレースが出力されるだけなので注意してください。また、リクエストするデータ数がLong.MAX_VALUEの場合、RxJavaでは通知できるデータ数は無制限と見なされ、再度データ数をリクエストする必要がなくなり、Publisherは生成したすべてのデータを通知するようになります。

 またRxJavaの場合、バックプレッシャーのデータ数はリクエストを受けるたびに加算されるようになっています。つまり、リクエストしたデータ数が残っている段階でさらにデータ数をリクエストを受けた場合、残っているデータ数に対し新たにリクエストしたデータ数が加算されます。例えば最初に「データ数 = 100」とリクエストして1件のデータを処理した後にさらに「データ数 = 100」とリクエストした場合、通知するデータ数の上限は199となります。このことを意識していないと、意図せずリクエストしたデータ数がLong.MAX_VALUEになってしまい、無制限にデータを通知することになってしまうので注意が必要です。

 そして、購読をキャンセル際のルールもあり、次のようになっています。

  • 購読をキャンセルする場合はSubscriptionが同じスレッド上もしくは同期がとられた状態で行わなければならない。
  • キャンセルを呼び出した後は同じSubscriptionのrequestメソッドおよびcacnelメソッドは無視される。
  • キャンセル時の処理はスレッドセーフでなければならない。

RxJava 2.xでのReactive Streams対応とRxJava 1.xからの変更

 前に述べたように、RxJavaが2.xにバージョンアップする目的の一つはReactive Streamsの対応です。そのためRxJava 1.xでは他のライブラリの依存はありませんでしたが、RxJava 2.xではReactive Streamsが提供するAPIに依存することになります。

 また、RxJava 2.xへのバージョンアップによる大きな変更に一つに、生産者と消費者の構成がRxJava 1.xではObservableとObserverの構成の一つだったのがRxJavaでは2つのグループに分割されたことがあります。このグループの一つはReactive Streamsの仕様を実装したFlowableとSubscriberの構成であり、もう一つはバックプレッシャーの機能がないObservableとObserverの関係です。

 FlowableはReactive Streamsの生産者であるPublisherの実装クラスで、SubscriberはReactiveStreamsのものになります。それに対しRxJava 2.xのObservableはReactive Streamsの実装を行っておらず、Reactive StreamsのAPIと連携することはまったくありません。

RxJava 1.xからRxJava 2.xの遷移
RxJava 1.xからRxJava 2.xの遷移

 FlowableとSubscriberの構成はReactive StreamsのAPIを使っており、Publisherの実装クラスがFlowableになっています。

Flowable-Subscriberの構成

Flowable-Subscriberの構成

 これに対し、ObservableとObserverの構成はFlowableとSubscriberの構成と同じような作りになっていますが、Reactive Streamsの対応は行っておらずバックプレッシャー機能がない構成となっています。そのため、FlowableとSubscriberの構成で使われているSubscriptionの代わりにDisposableという購読を破棄するための機能しか持っていないインタフェースを扱います。

Observable-Observerの構成

Observable-Observerの構成

 このDisposableは次のようになっており、disposeメソッドがSubscriberのcancelメソッドに相当し購読を破棄するために使われます。

/** 購読を解除するためのインタフェース */
public interface Disposable {
  /** 購読を破棄する */
  void dispose();
  
  /** すでに購読が破棄されているならtrueを返す */
  boolean isDisposed();
}

 RxJava 1.xからあるObservableとObserverなのですが、RxJava 2.xからはAPIも若干変更され、Flowable同様にObservableのsubscribeメソッドには戻り値がなくなっています。さらにSubscriber同様にObserverは新たにonSubscribeメソッドが追加されています。

 そして、前回は紹介していなかったのですが、RxJava 1.xのObservableとObserverの構成にもバックプレッシャーの機能は持っていたのですが、RxJava 2.xからはRxJava 1.xにあった「onBackpressure」で始まるメソッドなどのバックプレッシャーの機能が削除されています。

 このようにRxJava 2.xでは、Reactive Streamsに対応してバックプレッシャーの機能を持つFlowableの構成と、Reactive Streamsに対応せずバックプレッシャーの機能がないObservableの構成とに分割されたことがRxJava 1.xからの大きな構成の変化になります。そして、SubscriberやObserverを引数に取るsubscribeメソッドの戻り値がなくなったことにより、購読を途中でやめるにはSubscriberやObserverの内部でSubscriptionやDisposableを使って行うようにデザインが変更されています。

RxJava独自のsubscribeメソッド

 それでは、データが通知された際の処理しか必要ないような場合はどうなるのでしょうか? 完了時やエラー時に何もしないSubscriberを毎回生成しないといけないのでしょうか? この問題に対しRxJava 1.xでは、データ通知時の処理を行う関数型インタフェースを引数に持つsubscribeメソッドを提供してきました。それではRxJava 2.xではどうなのでしょうか?

 実はRxJava 2.xでも関数型インタフェースを受け取るsubscribeメソッドは用意されています。しかし、SusbcriberやObserverを引数に持つsubscribeメソッドのように戻り値がないようにすると、購読を途中でやめる手段がなくなってしまいます。そのため、関数型インタフェースを引数に取るsubscribeメソッドでは戻り値としてDisposableを返すようになっています。

// onNext時の処理(受け取ったデータを出力する)だけの例
Disposable disposable = flowable.subscribe(System.out::println);

 このDisposableは購読をやめさせる機能を持つインタフェースでdisposeメソッドを呼ぶことで処理を開始したFloawableやObservableに対して処理をやめさせられることができるようになります。また、このDisposableのdisposeメソッドはSubscriptionのcancelメソッドと同等で、実際にFlowableのsubscribeメソッドから生成されたDisposableのdisposeメソッドを呼ぶと内部でSubscriptionのcancelメソッドを呼ぶようになっています。

 また、Flowableで関数型インタフェースを受け取るsubscribeメソッドを使った場合、デフォルトではonSubscribe時にLong.MAX_VALUEのリクエストがされます。そのため、通知するデータ数の制限がなくなっているため、次のデータ数をリクエストをする必要がありません。

まとめ

 今回はRxJava 2.xが対応することにしたReactive Stremasについて見ていき、さらにRxJava 2.xがReactive Streamsをどのように対応し、それに伴いRxJava 1.xからどのように変わったのかについて見てきました。次回は、前回実装したRxJava 1.xのサンプルをRxJava 2.xではどのように実装するのか見ていきます。