対象読者
- Java経験者(初心者可)
- RxJava未経験者
- リアクティブプログラミング未経験者
※ただし、前回までの連載を読んでいる前提です。
サンプルの作成
前回はRxJavaやリアクティブプログラミングの概要について見てきました。今回はRxJavaを使った簡単なサンプルを実際に作成してみましょう。RxJavaは次期バージョンの2.xの安定板リリースが年末に予定されていますが、今回は執筆時点での安定板である1.xで実装します。
ちなみに2.xのリリースのスケジュールなのですが、前回のスケジュールから変更されて、今回の執筆時点では次のようになっており、いくつかの新たなRC版のリリースが追加されています。
- 2016/08/25:RC(Release Candidate)版1のリリース
- 2016/09/05:RC(Release Candidate)版2のリリース
- 2016/09/23:RC(Release Candidate)版3のリリース
- 2016/10/07:RC(Release Candidate)版4のリリース
- 2016/10/21:RC(Release Candidate)版5のリリース
- 2016/10/29:安定版のリリース(Stable Release)
環境構築
RxJavaは他のライブラリへの依存がないので、RxJavaのjarをダウンロードしパスを通すだけでRxJavaを使えるようになります。今回はバージョン1.xを対象にしており、バージョン2.xでは環境構築の方法が若干変わるので注意してください。
RxJavaのjarはMavenから取得することができます。検索サイトから、
g:"io.reactivex" AND a:"rxjava"
で検索すると、RxJavaの最新のjarがダウンロードできるようになります。

また、Mavenプロジェクトのpom.xmlに次のdependencyを追加してjarを取得することも可能です。今回は執筆時点での最新版である1.1.9を使います。
<dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> <version>1.1.9</version> </dependency>
同様にGradleからも追加が可能です。dependenciesに下記を追加してください。
dependencies { …略 compile 'io.reactivex:rxjava:1.1.9' }
RxJavaのソースコードはGitHubで公開されています。このGitHubのWikiページの「Getting Started」に環境構築の情報が載っています。
英語の情報ですが、内容が設定方法なので、それほど英語が得意でなくても理解できるかと思います。もし紹介した以外の設定方法を知りたい場合や、設定方法が変更された場合は参照してください。
最初のサンプルの作成
今回作るサンプルは、"Hello, World!"と"こんにちは、世界!"を通知して、受け取ったデータを標準出力し、すべてのデータを受け取った後に完了時の処理として"完了しました"と出力するものです。Observableがデータを通知し、Observerが各通知を受け取り処理します。今回はObservableが"Cold"のため、ObserverがObservableを購読すると処理を開始します。今回の処理は非同期で行わず、メインのスレッドのみで実行されています。このサンプルが行う処理の流れは次のとおりです。

- ObserverがObservableを購読し、Observableの処理を開始する。
- Observableが文字列"Hello, World!"を通知する。
- Observerがデータを受け取り、"Hello, World!"と標準出力する。
- Observableが文字列"こんにちは、世界!"を通知する。
- Observerがデータを受け取り、"こんにちは、世界!"と標準出力する。
- すべてのデータを通知した後にObservableが完了(onCompleted)したことを通知する。
- 完了の通知を受け取ったObserverが"完了しました"と標準出力する。
ちなみに後で非同期で処理を行った際の動作を確認するため、標準出力を行う際に実行されているスレッドの名前も出力しています。
// あいさつの言葉を標準出力するサンプル public static void main(String[] args) throws Exception { // あいさつの言葉を通知するObservableの生成 Observable<String> observableGreeting = Observable.create(new OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { // 購読解除されている場合は処理をやめる if (subscriber.isUnsubscribed()) { return; } // 1回目の通知をする subscriber.onNext("Hello, World!"); // 2回目の通知をする subscriber.onNext("こんにちは、世界!"); // 購読解除されていない場合 if (!subscriber.isUnsubscribed()) { // 完了したことを通知する subscriber.onCompleted(); } } }); // Observableを購読し処理を開始する observableGreeting.subscribe(new Observer<String>() { // Observableからのデータを受け取った際の処理 @Override public void onNext(String item) { // 実行しているThread名の取得 String threadName = Thread.currentThread().getName(); // Observableからのデータをそのまま標準出力する System.out.println(threadName + ": " + item); } // Observableから完了を通知された際の処理 @Override public void onCompleted() { // 実行しているThread名の取得 String threadName = Thread.currentThread().getName(); System.out.println(threadName + ": 完了しました"); } // Observableからエラーを通知された際の処理 @Override public void onError(Throwable e) { e.printStackTrace(); } }); }
main: Hello, World! main: こんにちは、世界! main: 完了しました
それでは、サンプルでは何をやっているのか見てみましょう。まずはデータを通知するObservableについて見ていきます。
Observable
Observableが行っている処理は次のようになっています。
// あいさつの言葉を通知するObservableの生成 Observable<String> observableGreeting = Observable.create(new OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { // 購読解除されている場合は処理をやめる ・・・(1) if (subscriber.isUnsubscribed()) { return; } // 1回目の通知をする ・・・(2) subscriber.onNext("Hello, World!"); // 2回目の通知をする ・・・(3) subscriber.onNext("こんにちは、世界!"); // 購読解除されていない場合 ・・・(4) if (!subscriber.isUnsubscribed()) { // 完了したことを通知する subscriber.onCompleted(); } // 完了通知後は何もしない ・・・(5) } });
実際にデータを通知する処理は、createメソッドの引数であるOnSubscribeインタフェースのcallメソッドで行われており、callメソッドの引数であるSubscriberを通じてObserverに通知を行っています。
まず、このcallメソッド内で、(1)で最初にObservableが購読解除(unsubscribe)されていないかをSubscriberのisUnsubscribedメソッドを使って確認し、購読解除されていたら処理を終えるようにしています。この確認処理は、もしすでに購読解除されているにも関わらず処理を続行した場合、Observerにデータが通知され続けることになるのを避けるために行っています。Observerが購読解除したにも関わらずデータを受け続けることは、無駄にCPUやメモリのリソースを消費することになってしまいます。そのため、購読解除されたら処理を終えるようにしています。このサンプルでは、そもそも購読解除を行わないのでObservableが購読解除されることはありませんが、仮に後で購読解除された場合を考えて、購読解除された際の処理を明示しておいた方が良いでしょう。
次にObservableは(2)と(3)でデータを通知する処理を行っています。createメソッド内ではSubscriberのonNextメソッドの引数に通知するデータを渡すことで、そのデータをObserverに通知することができます。ここでは、"Hello, World!"と"こんにちは、世界!"の文字列をonNextメソッドを呼んだ順に通知しています。
次にすべてのデータを通知し終えた後にSubscriberのonCompletedメソッドを呼ぶことでObserverに処理が完了したことを通知しています。(4)ではonCompletedメソッドを呼ぶ前に購読解除されていないかを確認し、購読解除されていない場合のみ完了を通知するようにしています。ここもまた、購読解除のチェックをしていなければ、購読解除を行ってもObserverに完了が通知されてしまうことになります。
そして最後の(5)ではonCompletedメソッドを呼んだ後は何もしていない点に注目してください。createメソッドを実装する際の重要な点として、RxJavaでは完了の通知後にObserverには何も通知しないことになっています。そのため、今回のサンプルではonCompletedメソッドを呼んだ後に何も処理をしないようにしています。これは、もしonCompletedメソッドを呼んだ後に、何らかの処理を行いエラーが発生しても、Observerにはエラーの通知が届かず適切な処理が行えない可能性があるためです。もし、すべてのデータを通知した後に何らかの後処理を行わないといけない場合は、onCompletedメソッドを呼ぶ前にその処理を行い、処理が終わった後にonCompletedメソッドを呼ぶようにしたほうが良いでしょう。
また、今回は使っていないですが、もし何らかのエラーをキャッチしないといけない場合、SubscriberのonErrorメソッドにそのエラーオブジェクトを渡すことで、Observerにエラーの通知が行え、そのエラーオブジェクトを渡すことができます。このonErrorメソッドを呼んだ後はonCompletedメソッドと同様にObserverには何も通知されない前提になっているので、速やかに処理をやめるようにしてください。
また、Observableのcreateメソッドの引数であるOnSubscribeインタフェースは関数型インタフェースなので、ラムダ式が使える環境の場合、ラムダ式を使って実装することも可能です。
Observable<String> observableGreeting = Observable.create(subscriber -> { …略 });
さて、Observableでデータを通知する準備ができたら、そのデータをObserverに購読(subscribe)させなければなりません。Observerに購読させるには、Observableのsubscribeメソッドを使います。そのsubscribeメソッドの引数にObserverを渡すことで、その引数のObserverはObservableからの通知を受け取れるようになります。そしてObservableが"Cold"である場合、subscribeメソッドが呼ばれると、Observableは処理を開始しObserverにデータを通知します。
Observer
それでは、通知されたデータを受け取るObserverを見てみましょう。Observerは次の3つのメソッドの実装が必要になります。
new Observer<String>() { // Observableからのデータを受け取った際の処理 @Override public void onNext(String item) { …略 } // Observableから完了を通知された際の処理 @Override public void onCompleted() { …略 } // Observableからエラーを通知された際の処理 @Override public void onError(Throwable e) { …略 } })
まず、Observableからデータが通知され、そのデータを受け取るとObserverのonNextメソッドが実行されます。onNextメソッドの引数item
はObservableから通知されたデータです。今回はObservableのcreateメソッド内でSubscriberのonNextメソッドの引数に渡されたデータがそのままObserverに通知されています。Observerは自身のonNextメソッド内で受け取ったこのデータを使って何らかの処理を行います。今回のサンプルでは実行されているスレッド名とともに受け取ったデータを標準出力しています。
そして、すべてのデータを通知し終えるとObservableは最後に完了(onCompleted)を通知します。Observableから完了が通知されると、ObserverはonCompletedメソッドを実行します。今回のサンプルでのonCompletedメソッドでは、実行されているスレッド名とともに"完了しました"と標準出力しています。そして、データを通知するObservableが正しく実装されている場合、このonCompletedメソッドが呼ばれた後は、他のonNextメソッドやonErrorメソッドが呼ばれることはありません。
最後にObservableからエラーを通知もしくはエラーが発生した際にObserverのonErrorメソッドが呼ばれます。ObserverのonErrorメソッドの引数にはObservableで発生したエラーオブジェクト(Throwable)が渡されてきます。そして、Observerはこのエラーオブジェクトより判断して適切なエラー処理を行います。今回はスタックトレースの出力のみ行っています。onErrorメソッドもonCompletedメソッドと同様に、呼ばれた後はObserverの他のメソッドが呼ばれることはありません。
サンプルのまとめ
今回のサンプルより、ObservableとObserverのデータ通知の関係をまとめると次のようになります。

-
Observable内の
subscriber.onNext(item)
にデータitem
を渡すと、ObserverのonNext(T item)
が実行され、引数に通知されたデータが渡ってくる。 -
Observable内の
subscriber.onCompleted()
を呼ぶと、ObserverのonCompleted()
が実行される。 -
Observable内でエラーが発生した場合、Observerの
onError(throwable error)
が実行され、引数に発生したエラーオブジェクトが渡ってくる。
主なポイントはこのようになりますが、今回のようにcreateメソッドを使ってObservableを生成する場合は、次のことも考慮しなくてはいけません。
- 完了もしくはエラーを通知した後はすみやかに処理を終える。
- 購読解除時の処理を入れる。
また、Observableの処理に終わりがある場合は、onCompletedメソッドを呼んで完了の通知を行うことを忘れないようにしないといけません。
サンプルの簡略化
今回のサンプルは最初のサンプルということで、どのようにObservableがデータを生成し通知するのかを見せるために、Observableのcreateメソッドを使って、データの生成から通知の処理を直接実装しました。そのため、購読解除時の処理や完了の通知などを実装者が気を付けて実装しないと適切な処理が行われない可能性があります。しかし、Observableの生成メソッドにはcreateメソッド以外にも、いろいろなメソッドが用意されており、すでに通知するデータが決まっている場合はObservableのjust
メソッドのように、通知するデータを設定するだけで、それらのデータを通知するObservableを生成するようなメソッドも用意されています。
また、subscribeメソッドの引数にはObserverの代わりに、各処理ごとの関数型インタフェースを渡すことも可能です。特に完了の通知やエラーの通知時の処理が不要な場合は、完了やエラーの通知を省略してデータ通知時の処理のみを受け取るsubscribeメソッドも用意されています。
メソッド | 概要 |
---|---|
subscribe(Action1<? super T> onNext) | データを受け取った際の処理のみ行う。完了の通知が来ても何もしない。 |
subscribe(Action1<? super T> onNext, Action1<Throwable> onError) | データを受け取った時とエラーの通知を受けた時の処理のみ行う。完了の通知が来ても何もしない。 |
subscribe(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted) | データを受け取った時とエラーの通知を受けた時の処理と完了の通知を受けた時の処理を行う。 |
これらのことを踏まえて、最初のサンプルとほぼ同じことをする次のサンプルを作ってみましょう。まず、Observableのjustメソッドを使ってあいさつの言葉を通知するObservableを生成しています。次にObserverの代わりに各通知時の処理を行う関数型インタフェースを渡して通知を受け取った処理を実装しています。
// あいさつの言葉を標準出力するサンプル public static void main(String[] args) throws Exception { // あいさつの言葉を通知するObservableの生成 Observable<String> observableGreeting = Observable.just("Hello, World!", "こんにちは、世界!"); // Observableを購読し処理を開始する observableGreeting.subscribe( // Observableからのデータを受け取った際の処理 item -> { // 実行しているThread名の取得 String threadName = Thread.currentThread().getName(); // Observableからのデータをそのまま標準出力する System.out.println(threadName + ": " + item); }, // Observableからエラーを通知された際の処理 error -> { error.printStackTrace(); }, // Observableから完了を通知された際の処理 () -> { // 実行しているThread名の取得 String threadName = Thread.currentThread().getName(); System.out.println(threadName + ": 完了しました"); }); }
ちなみに最初のサンプルとの処理の違いは、justメソッドで生成されたObservableの場合、各データを通知するたびに購読解除の確認をしていることです。そのため、このサンプルだと仮に"Hello, World!"を通知した後に購読解除された場合、"こんにちは、世界!"は通知されません。それでは、このサンプルの実装について見ていきましょう。
まず、Observableの生成ではObservableのjustメソッドが使われています。このjustメソッドは引数に渡したデータを順に通知するObesrvableを生成するメソッドです。このjustメソッドを使えば、通知するデータを引数に渡すだけで、購読解除時の対応も完了の通知も実装されているObservableが生成されます。
次の変更箇所はsubscribeメソッドの引数にObserverではなく関数型インタフェースを渡しているところです。subscribeメソッドにはObserverを受け取るものの他に各通知の処理を行う関数型インタフェースを受け取るものがあり、中には完了時の処理やエラー時の処理が省略されているメソッドもあります。仮に今回のサンプルで完了時とエラー時の処理を行わず、データを受け取った時の処理のみを行うようにする場合は次のようにすることができます。
// Observableを購読し処理を開始する observableGreeting.subscribe( // Observableからのデータを受け取った際の処理 item -> { // 実行しているThread名の取得 String threadName = Thread.currentThread().getName(); // Observableからのデータをそのまま標準出力する System.out.println(threadName + ": " + item); });
RxJavaを使うメリット・デメリット
さて、今回のサンプルの場合、RxJavaを使って実装してもあまりメリットがあるようには見えません。仮に今回のサンプルが出力している内容を、RxJavaを使わず一般的な実装をすると次のようになるかと思います。
public static void main(String[] args) { // 実行しているThread名の取得 String threadName = Thread.currentThread().getName(); // データの出力 System.out.println(threadName + ": Hello, World!"); System.out.println(threadName + ": こんにちは、世界!"); // 完了時の出力 System.out.println(threadName + ": 完了しました"); }
このようにRxJavaを使った実装と比べると、かなりすっきりとした実装をすることができます。あまりに単純な処理の場合、RxJavaを使わず素直に実装したほうが何をしたいのかが分かりやすく管理もしやすいこともあります。
では、データの生成が複雑だったり受け取ったデータの処理が複雑だったりした場合はどうでしょうか? 先ほどの一般的な実装の場合、処理の中にデータを生成する処理("Hello, World!"と"こんにちは、世界!")とデータを消費する処理(System.out.println
)が混ざっています。そのため、もしデータの生産に関し何らかの変更があった場合、データを消費する側にも影響する可能性が高くなります。同様に、消費する側に変更があれば生産する側にも影響が出る可能性があります。そうなると、どちらかの処理に変更が入るたびに両方の処理の影響を考えないといけなくなります。これは実装だけではなくテストにも影響することを考慮しないといけません。
しかし、RxJavaの場合は、生産側と消費側の役割が明確に分離されています。そのため生産側と消費側はそれぞれの役割にだけ集中することができ、どちらかに変更があったとしても、もう片方に影響を与えることを極力減らすことが可能です。
また、この分離によって生産側はデータの生産をし消費側に渡すことまでが責任の範囲になるので、消費側の処理の結果を待つ必要がなくなります。そのため、消費側の処理が非同期で行っても生産側に影響を与えません。つまり、消費側が処理を行っている途中でも、生産側は先に処理を進めることが可能になります。

このように、RxJavaを使うことによって、簡単に役割を明確に分離できるため、それぞれが複雑な処理を行う際にお互いに与える影響を少なくする実装が可能になります。また、この分離によって非同期で処理を行うことが容易になります。
非同期処理の簡単な説明
それでは非同期の話が出たところで、簡単にRxJavaで行える非同期の処理について見ていきましょう。RxJavaには大きく分けて2つの箇所で非同期処理の設定ができます。一つは生産側であるObservableの処理に対して。もう一つは通知したデータを受け取った際の処理に対してです。前者はsubscribeOn
メソッドを使って設定でき、後者はobserveOn
メソッドを使って設定できます。
-
subscribeOn
メソッド:生産側であるObservableの処理をどのようなスレッド上で行わせるかの設定ができるメソッド。 -
observeOn
メソッド: 通知したデータを受け取った時の処理をどのようなスレッド上で行わせるかの設定ができるメソッド。
ちなみにobserveOn
メソッドが設定するのはOberverに対してのみでなく、通知するデータの変換やフィルタをするObservableのインスタンスメソッドを使って元となるObservableから新しく生成されたObservableにも設定されます。
RxJavaでは非同期処理を行うのにスレッドを直接的に使うのではなくrs.Scheduler
を使って設定します。このSchedulerクラスは抽象クラスであり、通常使われるSchedulerはrx.schedulers.Schedulers
のメソッドを使って対象のSchedulerを取得します。今回はSchedulerを生成するメソッドには、どのようなものがあるのかについての説明はしないので、興味ある方はSchedulersのAPIを参照ください。
それでは先ほどのあいさつの言葉を出力するサンプルに対し、Observerの処理を非同期で行うように変更してみましょう。今回はメインのスレッドと異なるスレッドで行うため、Schedulers.computation()
で取得できるSchedulerを使います。このSchedulerはスレッドプールにスレッドがあればそこから取得し、なければ新しいスレッドを生成し使用後にスレッドプールに戻すSchedulerです。デフォルトではスレッドプールに保持するスレッド数は実行環境の論理スレッド数になります。
// あいさつの言葉を標準出力するサンプル public static void main(String[] args) throws Exception { // あいさつの言葉を通知するObservableの生成 Observable<String> observableGreeting = Observable.just("Hello, World!", "こんにちは、世界!"); observableGreeting // 通知後の処理を非同期で行うためのSchedulerを設定 .observeOn(Schedulers.computation()) // ・・・(1) // Observableを購読し処理を開始する .subscribe(new Observer<String>() { // Observableからのデータを受け取った際の処理 @Override public void onNext(String item) { // 実行しているThread名の取得 String threadName = Thread.currentThread().getName(); // Observableからのデータをそのまま標準出力する System.out.println(threadName + ": " + item); } // Observableから完了を通知された際の処理 @Override public void onCompleted() { // 実行しているThread名の取得 String threadName = Thread.currentThread().getName(); System.out.println(threadName + ": 完了しました"); } // Observableからエラーを通知された際の処理 @Override public void onError(Throwable e) { e.printStackTrace(); } }); // 非同期で行われていることを確認するため出力する String threadName = Thread.currentThread().getName(); System.out.println(threadName + ": subscribed!"); // しばらく待つ Thread.sleep(1000L); // ・・・(2) }
main: subscribed! RxComputationScheduler-1: Hello, World! RxComputationScheduler-1: こんにちは、世界! RxComputationScheduler-1: 完了しました
この実行結果は環境によって異なる可能性(例えば"subscribed!"が"Hello, World!"の後に出力されるなど)がありますが、今回の場合は実行結果より、Observableが購読された後に非同期で行っているObserverの処理を待たずに"main: subscribed!"が出力されています。その間Observerは別のスレッド上で自身の処理を行い、受け取ったデータを出力して最後に完了時の出力をしています。これがもし、(1)のSchedulerの設定がされていない場合はデータ通知後の処理は非同期で行われず、どの環境で実行しても次のような結果になります。
main: Hello, World! main: こんにちは、世界! main: 完了しました main: subscribed!
それでは非同期で処理を行うことによるメリットについて簡単に考えてみましょう。非同期処理のメリットはAndroidなどのGUIプログラミングを行う際に顕著になります。もし、Observableをテキストの入力部品と見なし入力値の値が変わるたびにデータを通知するとした場合、非同期で処理を行っていないと、データを通知した後にそのデータが処理されるまで画面がフリーズしてしまいます。データを受け取った後の処理がすぐに終わるような場合はフリーズしている時間も一瞬のため気にもなりませんが、これが時間がかかるような処理の場合、画面が再び処理をできるようになるまで時間がかかることになります。つまり処理が終わるまで操作が行えず、かかる時間によっては大きな問題になります。
このように非同期で処理を行えることは、ある処理を行っている最中に別の処理を行えることができるというメリットがあります。しかし、このように複数のスレッドを使ってそれぞれの処理を同時に実行できるということは異なるスレッド上の処理に関しては順番が保証されない、つまり、今回のサンプルだとメインスレッドから出力される"subscribed!"と別スレッドから出力される"Hello,World!"などの通知データのどちらが先に出力されるのかは保証されません。このことは特に複数のスレッドから一つのインスタンスにアクセスする場合に大きな問題になります。この他にもいろいろなメリットや、もちろんデメリットもありますが、とりあえずは非同期処理を行うことで、このようなメリットやデメリットがあることを覚えておきましょう。
また、今回のサンプルでは(2)のように処理の最後にThread.sleep(1000L);
で別スレッドの処理が終わるまでmainのスレッドを待機させています。もしこの処理を入れていないと、次のように非同期で行っているデータを受け取った後の処理を行えずにプログラムが終わってしまう結果になる可能性が高いです。
main: subscribed!
これはObserverが処理を行おうとしている間にmainメソッドの処理が最後まで実行されてしまい、プログラムを終了しているためです。そのため、非同期で行っている処理を待たずにプログラム自体が終了してしまうことが困る場合には注意する必要があります。
ただし、今回のサンプルのように処理の最後でThread.sleep
を使ってスレッドを止めたり、RxJavaでデバッグやテスト用として用意されている処理が終わるまで待つようなメソッドを使って処理を止めることは、あまり良いプラクティスではありません。もし、このように処理を止める必要がある場合は、そもそも非同期で処理を行う必要があるのかどうかも含め、アプリケーションの設計を見直したほうが良いでしょう。
まとめ
今回は現在の安定板であるRxJava 1.xを使った簡単なサンプルを作成しました。次回は、今回のサンプルを元に次のメジャーバージョンとなるRxJava 2.xを使った場合の実装を見ていきます。