対象読者
- Java経験者(初心者可)
- RxJava未経験者
- リアクティブプログラミング未経験者
RxJavaとは
RxJavaは、Javaでリアクティブプログラミングを行うためのライブラリです。このライブラリは軽量であり、また、他のライブラリに対する依存がないのでRxJavaのjarをパスに通すだけで使えるようになります。対応しているJavaのバージョンは6からで、さらにAndroidもバージョン2.3(Gingerbread)からサポートしています。
そして、RxJavaは厳密には関数型リアクティブプログラミング(Functional Reactive Programming)ではないのですが、引数に関数型インターフェースを受け取るメソッドを使っているので、関数型プログラミングのように関数を組み合わせていくことで、データの変換やフィルタなどを柔軟に組み立てれるようになっています。
リアクティブプログラミングとは
リアクティブプログラミングとは、データが流れるように来ること(ストリーム)に着目し、データを受け取るたびに関連したプログラムが反応(リアクション)して処理を行うようにするプログラミングの考え方です。
このデータのストリームとは、例えばGPSを利用して位置情報が変わるたびに送信されるデータの流れをイメージしてもらうと分かりやすいかもしれません。移動し位置情報が変わるたびにデータが送信され、立ち止まればデータの送信が止まるように、生成されるデータをすべてまとめて送信するのではなく、個々のデータが生成される度に順に送信していきます。このようなデータの流れのことをデータストリームと言います。
このデータストリームはイベントとも関連していて、文字列を入力するという行為は、入力したデータが順に生成されているとみることができます。例えば「abc」という入力は、入力のイベントが発生するたびに
- 「a」
- 「ab」
- 「abc」
と、データが発生していると見なすことができます。同様にボタンを押す行為に関しても、具体的なデータがなくても「ボタンを押した」というデータが生成されていると見なすことが可能です。ボタンを複数回押した場合、その押した数だけ「ボタンを押した」というデータが生成されていることになります。つまりイベントも、発生するたびにデータを発信するデータストリームとして扱うことが可能になります。
リアクティブプログラミングでは、このようなデータストリームから流れてくるデータに対し、そのデータを受け取ったプログラムが、そのたびに処理をしていく作りになっています。つまり、プログラムが必要なデータを自分から取得し処理をするのではなく、送られてきたデータを受け取るたびに反応して処理をする(リアクティブな処理をする)ようなプログラムにすることがリアクティブプログラミングになります。
例えば、商品価格と税率から税額を計算するプログラムがあったとします。リアクティブプログラミングでない場合、商品価格と税率の値を取得しただけでは何も起こりません。値を取得した後に「税額の計算処理を行う」というアクションが実行されて初めて税額が算出されます。そして、税額を計算した後に商品価格の値が変わったとしても、再び計算処理のアクションが実行されない限り税額の値は変わりません。

しかし、これがリアクティブプログラミングの場合、商品価格の値が変化するたびに税額が計算され表示されるようになります。これは商品価格の値が変化するたびに税額を計算するプログラムに値が送信され、プログラムが値を受け取ると税額の計算処理を行い、その計算結果が税額に反映されるためです。そのため、商品価格の値が変わるたびに商品価格が通知され、税額が自動的に算出されるようになります。

これだけだと、商品価格にListenerをつけて自動で計算できるようにしたプログラムとどう違うのか疑問に思われるかもしれません。この場合、リアクティブプログラミングなのかどうかは、何が処理を行うのかの意識の違いになるかと考えられます。例えば、商品価格が変更されListenerが反応して、商品価格が税額を計算して値を反映しているという意識の場合は、リアクティブプログラミングの考え方にはなりません。

これをListenerが反応したことによって、税額に新たな商品価格が渡され、税額にて計算のプログラムが実行されて、その結果を税額自身に反映していると意識している場合は、リアクティブプログラミングの考え方になります。

そのため、今回のような簡単なプログラムの場合は、前者も後者もソースコード上ではまったく同じになる可能性はあります。しかし、このプログラミング上の考え方の違いによって、どのプログラムが何をするのかの範囲が変わってきます。行うべき処理が複雑になってきたり、商品価格の変更によって値が変わる項目が増えてきたりするにつれ、どこで処理を行うのかの比重が変わり、ソースコード上にも違いが現れてくるでしょう。
リアクティブプログラミングでは、データの生産側(今回の例では商品価格)はデータを渡すところまでが自身の責任になります。そのため、データの生産側はデータの消費側(今回の例では税額)の処理についてはそのデータを使って何をしているのかを意識しなくてもよくなります。そしてそのことは、データの生産側は消費側で何をしているのかは関係がなくなり、消費側の処理を待つ必要もなくなるので、データの通知をした後に、消費側の処理が途中であっても、すぐに生産側の次の処理を行えます。こうすることで非同期処理を容易に実現できるようになります。
また、システムの構築という点で見ても、リアクティブプログラミングの考え方はマイクロサービスなどの分散システムで稼働するプログラムにも向いており、現在大きな注目を浴びているプログラミング手法の一つになっています。
ただし、紛らわしいことに、「リアクティブシステム」と言った場合、「リアクティブプログラミングで実装されたシステム」のことを表さないことに注意しないといけません。リアクティブシステムとは、簡単に説明すると、メッセージを送ることで処理を行い、状況に応じてスケールアウトやスケールインが自動で行われ、障害の耐性を高めることによって、常に迅速な応答を得ることができるシステムのことを言います。そのため、このリアクティブシステムはプログラムだけではなくインフラの条件も必要となります。そして各サービスに対しては別にリアクティブプログラミングで実装していなくても、リアクティブシステムの構築は可能でもあります。
RxJavaの概要
RxJavaは、もともとは2009年にMirosoftで.NET Frameworkの実験的なライブラリ「Reactive Extensions」(略して「Rx」)として公開され、2012年にオープンソース化されました。それを後にNetflixがJavaに移植しオープンソースとして公開したものがRxJavaになります。
現在、このReactive Extensionsを扱うライブラリはReactiveXとしてオープンソースプロジェクト化し、Javaや.NETだけでなくJavaScriptやSwiftなどさまざまなプログラミング言語に対応したライブラリを提供しています。
現在、RxJavaは次のメジャーバージョンとしてReactive StreamsのAPIに準拠したRxJava 2.xを開発中です。RxJava 2.xはもともとJava 8以降を対象にしていたのですが、途中でJava 6以降およびAndroid 2.3以降も対象に含まれるよう変更されました。こちらはもうすぐ正式リリース予定で現在発表されているスケジュールは次のようになっています。
- 2016/08/25: RC(Release Candidate)版1のリリース
- 2016/09/23: RC(Release Candidate)版2のリリース
- 2016/10/21: RC(Release Candidate)版3のリリース
- 2016/10/29: 安定版のリリース(Stable Release)
Reactive Streamsとはリアクティブにデータを処理をするための標準化を行っているもので、Java用の標準化したInterfaceを提供しています。
このReactive Streamsが提供しているInterfaceはRxJava 1.xとは異なりますが、RxJavaで用意されているReactive Streamsに変換するためのライブラリであるRxJavaReactiveStreamsを使うことでRxJava 1.xでもReactctive Streamsと連携することは可能です。ちなみに1.xでReactive Streamsに対応していない理由は、単にRxJavaの方が先に存在し、しばらく経った後でReactive Streamsが作られたためです。
このようにRxJavaは2.xのタイミングで下位互換性がない大幅な変更が入ります。しかし、RxJava 2.xのREADMEによると、しばらくの間は2.xと1.xの両方を並行して開発するそうです。そのため1.xで開発されているものを(どこかのタイミングで変えないといけないにしても)急いで2.xに変える必要はないかと思われます。また、APIが変わっても設計の本質的なところは1.xと大きくは変わらないため、1.xを知っておけば2.xについても簡単に理解できるかと思われます。今回は執筆時点ではまだRC版も出ていない状況なのでRxJava 1.xでの内容になります。連載を続けていくうちに2.x系に変わる予定です。
また、RxJavaのAPIの中には、まだ開発途中のものや実験的に入れているものもあります。それらのAPIはリリースするたびに前バージョンとの互換性がない変更が入る可能性が高いものであり、それらのAPIに依存したプログラムを実装しないほうが安全です。次のアノテーションがついているものが開発途中や実験的なAPIになります。
- @Beta
- @Experimental
そのほかにもRxJavaライブラリで内部的に使われるためのrx.internalのパッケージがあり、ここにあるAPIもリリースの際に破壊的な変更をされる可能性が高いので、このAPIに依存したプログラムも実装しないほうが安全です。
RxJavaの特徴
RxJavaの特徴の一つとして、デザインパターンの一つであるObserverパターンをうまく拡張している点があります。特にデータを生産する側とデータを消費する側に分けることで、無理なくデータストリームを扱うことができ、リアクティブプログラミングで実装できる作りになっています。
また、RxJavaは非同期の処理が行いやすい点も特徴です。開発者はまったく非同期処理について考えなくてよいわけではありませんが、Observable契約というRxJavaの開発ガイドラインに従って実装している限り、直接スレッド管理を行うような煩わしさから解放されます。また、同期処理であろうが非同期処理であろうが実装方法が大きく変わらない点もポイントの一つになります。
そして、RxJavaでは関数型プログラミングの影響を受けており、処理の多くを関数型インターフェースを受け取るメソッドを使って実現しています。このことは入力と結果のみさえ決めてさえあれば具体的な処理に関しては関数型インタフェースを実装する開発者に委ねられるため、実装上の自由度が増します。ただし、このような処理には関数型プログラミングの原則に従って副作用を発生させないような作りにしないといけません。副作用を発生させる処理とは、オブジェクトの状態を変更したり再代入を行ったりなどして、処理の外部でも参照可能なオブジェクトに対して何らかの変化を加えることです。また、ファイルやデータベースの中身を変えるようなことも副作用としてみなされます。関数型プログラミングにおいて、同じ入力を受け取ったら毎回同じ結果を返し、かつ、その入力値や処理の外部に対して何も変化を起こさないことを原則としています。RxJavaでは基本的に副作用を発生させるような処理を行うのはデータを生産する側ではなく、データを消費する側で行った方が良いでしょう。
また、この関数型インターフェースの処理において副作用を発生させないことは、複数スレッドから共有されるオブジェクトがないことになり、スレッドセーフを担保できることになります。このことはRxJava上で同期の処理を行っているものを途中で非同期に変更するとしても、すでに実装されているロジックに対しての変更が発生しません。開発者が行うことはどの部分を非同期にするのかの設定処理だけになります。
RxJava(1.x)の基本的な仕組み
次期バージョンである「RxJava 2.x」と執筆時点で安定版とされている「RxJava 1.x」とはクラス名やメソッド名などが異なるので、今回はRxJava 1.xについて解説していきます。本稿での「RxJava」はRxJava 1.xのことを指します。それではRxJavaについて見ていきましょう。
RxJavaの基本的な仕組みは、データを生産し通知する「Observable」と通知されたデータを受け取りそのデータを消費する「Observer」の2つの関係で成り立っています。このObservableをObserverが購読(subscribe)することで、Observableが通知したデータをObserverが受け取ることができるようになります。
- Observable: データを通知するオブジェクト
- Observer: データを受け取り処理をするオブジェクト
そして、Observableが問題なく全てのデータを通知し終えると、すぐに正常終了をしたことをObserverに通知します。もし途中でエラーが発生した場合はエラーオブジェクトをObserverに通知し異常終了したことを伝えます。RxJavaではObservableが正常終了もしくは異常終了したことを伝えた後は何も処理を行わない前提になっています。つまり、正常終了したことを通知した後に他のデータや異常終了を通知することはできず、異常終了を通知した後も他のデータや正常終了をしたことを通知することはできません。
そのため「終了」だけだと紛らわしくなるので、この連載では正常終了したことを通知することを「完了」を通知すると言い、異常終了したことを通知することを「エラー」を通知すると言うようにします。そして「終了」が通知されるということは、「完了」もしくは「エラー」のどちらかが通知されることを言うようにします。つまり「完了」を通知するということは全てのデータが問題なく通知され正常終了したことを表し、「エラー」を通知するということはObservableの処理中に何らかのエラーが発生し異常終了したことを表します。「終了」を通知すると表現した場合は「完了」もしくは「エラー」のどちらかかを通知することを表します。
- 完了:正常終了
- エラー:異常終了
- 終了:完了(正常終了)もしくはエラー(異常終了)
このObservableとObserverとの基本的な処理の流れは次のようになります。
- ObservableをObserverが購読(subscribe)する。
- Observableの処理を開始する。
- Observableがデータを生産しObserverに対しそのデータを通知する。
- データを受け取ったObserverはそのデータを使って処理を行う。
- Observableは全てのデータを通知したら完了の通知を、途中でエラーが発生したらエラーを通知し、処理を終了する。

Observableには、Observerから購読されると必ず処理を開始する"Cold"なObservableと、Observerによる購読に関係なく処理を開始することもある"Hot"なObservableがあります。ここではCold Observableについて説明していますが、ObservableがObserverにどのようにデータを通知するかの大枠だけ見ると、ColdでもHotでも基本的にはほぼ同じ処理になります。
Observable
Observableの通知処理にはObservable契約というガイドラインがあり、次のようなことが書いてあります。
- 0個以上のデータをシーケンシャル(逐次的)に通知する。
- 全てのデータを通知し終えたら完了を通知する。
- 途中でエラーが発生したらエラーを通知する。
- 完了もしくはエラーを通知したらそれ以降の通知はしない。
それでは実際にデータを通知するObservableを生成する例を見てみましょう。Observableの生成方法はこの他にもありますが、まずは基本的な仕組みがわかりやすいcreateメソッドを見ていきます。
Observable.create(new OnSubscribe<T>() { @Override public void call(Subscriber<? super T> subscriber) { try { …通知するデータの生成処理など // データの通知 subscriber.onNext(データ); subscriber.onNext(データ); subscriber.onNext(データ); … // 完了したことを通知する subscriber.onCompleted(); } catch (SomethingException e) { // エラーが発生したことを通知する subscriber.onError(e); } } })
ここではObservableのstaticファクトリメソッドであるcreateメソッドを使ってObservableを生成しています。ちなみにstaticファクトリメソッドを簡単に説明すると、対照のクラスのインスタンスを返すメソッドで、コンストラクタからの生成に比べ細かい設定が隠蔽できたりなどの利点を持つ、よく使われるプログラミングのテクニックの一つです。このcreateメソッドの引数であるrx.Observable.OnSubscribeは関数型インタフェースであり、実装されるべきメソッドを一つだけ持っています。このOnSubscribeのメソッドを実装することで、Observableが何をいつ通知するのかを定義することができます。
このOnSubscribeのcallメソッドでは引数にObserverへの通知を行うSubscriberを受け取ります。このSubscriberにはObserverにデータの通知および完了やエラーの通知を行うメソッドが用意されています。
メソッド | 概要 |
---|---|
onNext(T t) | Observableのデータを通知します。 |
onCompleted() | Observableの処理が完了(正常終了)したことを通知します。 |
onError(Throwable e) | Observableの処理中に発生したエラーを通知します。 |
まず、Observerへのデータの通知にはSubscriberのonNextメソッドを使います。このonNextメソッドの引数に通知するデータを渡すと、Observerにそのデータを通知します。通知するデータが複数ある場合はSubscriberのonNextメソッドに順番にデータを渡すことで、Observerにもその順番通りにデータが通知されます。
ここで重要なことは、Observableがデータを通知する際には、同じスレッドもしくは同期がとられた状態でSubscriberのメソッドが呼ばれることを前提として設計されている点です。例えば、Observableの内部で複数のスレッドを生成し、同期をせずに別々のスレッドからSubscriberのonNextメソッドにデータを渡すことは安全でない通知と見なされます。

そのため、もし、データの生成を複数スレッドから行ったとしても、SubscriberのonNextメソッドに通知するデータを渡す際は同期してから渡さなくてはいけません。

しかし、RxJavaを使う上で、Observableにこのような同期処理を開発者自身が実装することはバグを生み出したり、非効率な同期を行いパフォーマンスを逆に下げたりする危険性があるので避けた方がよいでしょう。そのような処理が必要な場合はそもそもの設計を見直すか、RxJavaに用意されている複数のObservableを一つに結合するメソッドなどをうまく使って処理を行うようにしたほうが安全です。この結合するメソッドを使う場合、各Observableが異なるスレッド上で処理をしていたとしても、結合されたObservableは内部で同期を取り安全に通知を行うので、開発者が同期についての実装を行う必要がなくなります。
そして、Observableが全てのデータを通知し、処理が最後まで正常に行われたら、SubscriberのonCompletedメソッドを呼んで、Observerに処理が完了したことを通知します。このonCompletedメソッドは処理が正常に完了したことを示すメソッドであり、それ以降データは通知されない前提になっています。ですので、SubscriberのonCompletedメソッドを呼んだ後は何も処理をせず終了するようにしてください。もし、onCompletedメソッドを呼んだ後に何らかの後処理をした場合、そこでエラーが発生しても、そのエラーはObserverには通知されず、何もそのエラーに対する処理が行われない可能性が高くなります。
もし、Observableの処理の途中でエラーが発生した場合は、SubscriberのonErrorメソッドを呼びます。このonErrorメソッドはObserverにエラーが発生したことを通知するメソッドであり、引数にエラーのオブジェクト(Throwable)を渡すことでObserverにそのエラーのオブジェクトが通知されます。RxJavaではSubscriberのonErrorメソッドが呼ばれることはObsrvableの処理が異常終了したことを示し、その後はonCompletedメソッド同様にObservableでは処理を行わないことが前提になっています。
また、このサンプルでは分かりやすさを優先するため割愛しましたが、本来はSubscriberで通知処理を行う前に購読解除(unsubscribe)されていないか確認をしないといけません。つまり、Observableが通知を行う際に、通知を受け取るObserverから購読解除されて通知を行う相手がいなくなっていれば、それ以上の通知を続けることは不要になります。
そして、Observableには大きく分けて2つの種類があります。一つは"Cold"なObservabeと呼ばれるもので、もう一つは"Hot"なObservableと呼ばれるものです。この二つの違いを簡単に説明すると、"Cold"なObservableは必ず一つのObserverにしか通知しないのに対して、"Hot"なObservableは複数のObserverに通知することが可能な点です。これは"Cold"なObservableが処理の途中で別のObserverに購読された場合、既に行っている処理のタイムラインとは別のタイムラインを生成して新しく購読したObserverに通知を行う性質によります。そのためObserverが購読するたびに新しい処理を最初から始めるので、"Cold"なObservableは必ず一つのObserverにしか通知できません。これに対し"Hot"なObservableが処理の途中で別のObserverに購読されたら、既に実行されている処理のタイムラインを通じて新たなObserverにデータを通知します。つまり、前から購読しているObserverにも後で購読し始めたObserverにも同じデータを通知します。そのため、"Hot"なObservableは複数のObserverに対して同じデータを通知することが可能になります。
Observer
ObserverインタフェースはObservableの通知を受けて、通知の種類ごとの処理を行うためのインタフェースです。Observerは次の通知を受け取り、各通知ごとのメソッドを持っています。
- データ(onNext)
- 完了(onCompleted)
- エラー(onError)
new Observer<T>() { @Override public void onNext(T item) { …Observableから通知されたデータ(item)を受け取った際の処理 } @Override public void onCompleted() { …Observableから完了の通知を受け取った際の処理 } @Override public void onError(Throwable e) { …Observableからエラーの通知を受け取った際の処理 } }
ObserverのonNextメソッドは、Observableが通知したデータを受け取り、そのデータを使って処理をするメソッドです。onNextメソッドの引数はObservableのcreateメソッドの内部でSubscriberのonNextメソッドに渡したデータが通知されます。Observerはこのデータを受け取って何らかの処理を行います。
ObserverのonCompletedメソッドは、Observableが完了を通知した際の処理を行うメソッドです。Observableのcreateメソッドの内部にてSubscriberのonCompletedメソッドを呼ぶことで、ObserverのonCompletedメソッドが実行されます。
ObserverのonErrorメソッドは、Observableからエラーが通知された場合に処理を行うメソッドです。Observableからのエラー通知は2種類あり、一つはObservableのcreateメソッドの内部にて明示的にSubscriberのonErrorメソッドを呼んだ場合で、もう一つはcreateメソッド内でcatchされないエラーが発生した場合です。この時、ObserverのonErrorメソッドの引数にはObservableが通知したエラーのオブジェクトが渡されます。
しかし、Observable内で発生するcatchされないエラーの中にはStackOverflowErrorやVirtualMachineErrorのように継続すること自体が好ましくないエラーもあります。そのようなエラーは通知ではなく、そのエラーをそのままスローすることで処理を終了します。
Observableのsubscribeメソッド
ObservableとObserverの準備ができたらObservableのsubscribeメソッドの引数にObserverを渡してObservableを購読します。そうすることでObservableが通知するデータをObserverが受け取ることが可能になります。また、subscribeメソッドを呼ぶと戻り値としてSubscriptionを受け取ります。
Subscription subscription = observable.subscribe(observer);
subscribeメソッドにはObserverを引数として受け取るもののほか、通知時の処理が実装された関数型インタフェースを引数にとるものなど様々なメソッドが用意されていますが、ここでは基本的なObserverを引数に取るものを見ていきましょう。
subscribeメソッドにObserverを渡すと、ObservableはそのObserverに対して生成したデータを通知できるようになります。ただし、subscribeメソッドを呼ぶことによってObservableが処理を開始するかどうかは、そのObservableが"Cold"か"Hot"かによって変わってきます。
ColdなObservableの場合、subscribeメソッドが呼ばれると、Observableは処理を開始します。また、Observableが処理を行っている最中に、別のObserverがそのObservableに対してsubscribeメソッドを呼んだ場合、既に行っている処理はそのまま継続したまま、新しい処理をその別のObserverに対して最初から開始します。つまり、Observerが購読するたびに新しい通知の処理ができることになります。
例えば1、2、3と順にデータを通知する"Cold"なObservableに対して、あるObserverが購読したとします。そして、そのObservableが1を通知した後に別のObserverがそのObservableを購読した場合、最初のObserverは順に1、2、3とデータを受け取り、後で購読したObserverは購読したタイミングから順に1、2、3とデータを受け取ることになります。
これに対しHotなObservableの場合、Observableの処理を開始するのかどうかはHot Observableの種類によって異なり、開始するものもあれば開始しないものもあります。また、既に処理を開始しているHot Observableに対し別のObserverがsubscribeメソッドを呼んだ場合、そのObservableは最初から処理をするのではなく、処理の途中からそのObserverにも通知を行います。
例えば先ほどの1、2、3と順にデータを通知するObservableが"Hot"なObservableだった場合、そのObservableに対してObserverが購読したとします。そして、そのObservableが1を通知した後に別のObserverがそのObservableを購読した場合、この"Hot"なObservableが通知したデータをキャッシュしていないタイプだと、最初のObserverは1、2、3とデータを受け取り、後で購読したObserverは2、3とデータを受け取ることになります。
そしてObservableのsubscribeメソッドは戻り値にSubscriptionを返します。これは他の引数をとるsubscribeメソッドも同様でSubscriptionを戻り値として返します。このSubscriptionは途中で購読を解除するためのunsubscribeメソッドを持っていて、unsubscribeメソッドを呼ぶことでObservableの処理の停止を促せます。促せるというのはunsubscribeメソッドを呼んでもObservableが途中で処理を止めるのが危険な場合もあるため、購読の解除をリクエストしても安全な状態になるまで処理が続くこともあるためです。そして、unsubscribeメソッドを呼ばれて処理を止めたObservableは基本的に完了もエラーも通知しません。
また、Observableのcreateメソッドのように、実装者が直接どのような通知を行うのかを実装したObservableを生成している場合、購読解除された際の処理がプログラムに記述されていないと、unsubscribeメソッドが呼ばれても処理は中断されず最後まで通知が行われてしまいます。
このことは、例えば、Observableが膨大の数のデータ通知している場合に、unsubscribeメソッドを呼んでその処理を途中で止めようとしても、そもそも購読解除時の処理が実装されていないので完了もしくはエラーが発生するまで、このObservableは処理を続けるようなことになってしまいます。
このようなことを避けるため、Observableが購読解除されている状態なら処理を中断し、それ以上の処理を行わないように実装しておく必要があります。購読解除されたかどうかはSubscriberのisUnsubscribedメソッドで確認することができます。trueが返る場合はObservableが購読解除されたことを示します。購読解除時の処理を追加した場合は次のようになります。
Observable.create(new OnSubscribe<T>() { @Override public void call(Subscriber<? super T> subscriber) { try { while(true){ // 購読解除されているなら処理をやめる if (subscriber.isUnsubscribed()){ return; } …通知するデータの生成処理など // データを通知する subscriber.onNext(item); // 条件によってループを抜ける if (何らかの条件){ break; } } // 購読解除されていない場合だけ実行 if (!subscriber.isUnsubscribed()){ // 完了したことを通知する subscriber.onCompleted(); } } catch (Exception e) { // 購読解除されていない場合だけ実行 if (!subscriber.isUnsubscribed()){ // エラーが発生したことを通知する subscriber.onError(e); } } } })
また、一つのObserverが複数のObservableのデータを必要とする場合は、Observableを連結し新たなObservableを生成することで一つのObserverで対応できるようになります。Observableにはそのような連結を行うためのメソッドがあります。
例えば、商品の価格と税率から税額を計算するような場合、ObservableのcombineLatestメソッドを使って、次のような実装で商品価格の変化や税率の変化に対応することが可能になります。
Observable<BigDecimal> priceObs = …商品価格の値を通知するObservable Observable<BigDecimal> taxRateObs = …税率の値を通知するObservable // 2つのObservableのデータから新しいデータを通知するObservableを生成 Observable .combineLatest(priceObs, taxRateObs, // 結合するObservable // 各Observableから受け取ったデータを使って処理をする // 関数型インタフェース (price, taxRate) -> price.multiply(taxRate) .setScale(0, RoundingMode.DOWN)) // 購読開始 .subscribe( new Observer<>(){ @Override public void onNext(BigDecimal tax){ 関数型インタフェースの計算結果を受け取り処理を行う } …完了時とエラー時の処理は省略 });
ObservableのcombineLatestメソッドは、引数に渡された複数のObservableのデータを使って新しいデータを生成し、その新しいデータを通知するObservableを生成します。引数に渡されたObservableのどれかがデータを通知したタイミングで、各Observableが最後に通知したデータを引数として関数型インタフェースに渡します。そこで定義した処理を行った結果がデータとして通知されます。
他にも複数のObservableを連結させるメソッドがRxJavaには用意されています。これらのメソッドを使えば一つのObservableになるので、その一つになったObservableに対しObserverが購読することで、複数のObservableの通知が必要であっても、一つのObserverで通知を受け取ることが可能になります。
Observableのメソッド
ObservableのJavaDocを見るとわかるのですが、Observableのメソッドの多くはObservableを返します。これはObservableの多くのメソッドがデータのフィルタや加工を行うメソッドだからです。これらのメソッドが呼ばれると、元のObservableからデータのフィルタや加工を行った新しいObservableが生成されます。そして、新規に生成されたObservableは元のObservableが全ての処理を行ってから生成されるのではなく、メソッドを呼んだタイミングで生成され、元のObservableが各データを通知するタイミングで変更されたデータを通知します。

この特性よりメソッドを数珠つなぎ(メソッドチェーン)にして元のObservableから必要なデータを通知する新しいObservableを生成することができます。これはJava 8のStreamを使ったことがある人だとイメージがしやすいかもしれません。
例えば、次のサンプルでは1から10までの数値を通知するObservableからデータを偶数のみに絞り、そのデータを括弧でくくり"[データ]"の文字列に変換したものを通知するObservableを生成しています。このサンプルでは、まだ説明していないメソッドやラムダ式で記述しているところがありますが、ここでは詳細については気にしないで、メソッドチェーンを使って最終的なObservableを生成していることに注目してください。
Observable<String> observable = Observable.range(1, 10) // ① 1~10までの数値を通知するObservable .filter(value -> value % 2 == 0) // ② 偶数のみにする .map(value -> "[" + value + "]"); // ③ "[データ]"の文字列にする // 購読開始 observable.subscribe(System.out::println);
これを実行すると次の結果になります。
[2] [4] [6] [8] [10]
これを段階的に説明すると、まず①ではrangeメソッドを使って1から始まるデータを10個まで通知するObservableを生成しています。Observableが通知するデータは次のものになります。
1、2、3、4、5、6、7、8、9、10
次に②で受け取ったデータをvalue % 2 == 0
で判定を行い、偶数のもののみにフィルタをしています。ここで生成されるObservableが通知するデータは次のものになります。
2、4、6、8、10
最後に③で受け取ったデータを"[" + value + "]"
でデータの両端に括弧をつけた文字列に変換しています。ここで生成されるObservableが通知するデータは次のものになります。
"[2]"、"[4]"、"[6]"、"[8]"、"[10]"
このデータをObserverが受け取ることで実行結果の内容が出力されます。
また、関数型インタフェースを実装する際の注意点として、受け取ったデータ(各関数型インタフェースで引数として受け取るデータ)に対し、そのデータの状態を変えたり、Observableの外部の状態を変えたりするような副作用を起こさないようにしなければなりません。関数型インタフェースの実装の原則として、関数型インタフェースに対し引数に同じ値が与えられたら同じ結果を毎回返し、かつ引数やObservableの外部に対して何も変えないようになっています。そのため、Observable内で扱うデータは基本データ型か生成後にオブジェクトの中身が変わらないイミュータブル(Immutable)なオブジェクトを使うことが好ましいです。
マーブルダイアグラム(Marble Diagram)
RxJavaのJavaDocやReactiveXのドキュメントを読んでいると、よくカラフルな丸や四角などの図形や矢印が入った図が出てきます。これはマーブルダイアグラムと言い、リアクティブプログラミングにおいて時間とともにどのようにデータが受け渡され、どのように変化していくかを表現している図です。主にObservableがメソッドを実行すると、時間とともにデータがどのように変化していき新たなObservableを生成するのかを説明するのに使われます。特にRxJavaでは、データストリームとして流れるように来るデータがどのように変化するのかが重要なことです。そのため、時間経過に伴うデータの変化を可視化した図があることによって、文章に加えてビジュアルからの情報も入手できるので、何が行われているのかがさらにわかりやすくなります。
例えばObservableが持つfilterメソッドの場合を見てみましょう。filterメソッドは元のObservableが持つデータから指定した条件のデータのみを持つ新たなObservableを生成するメソッドです。filterメソッドが引数に受け取るものは、そのデータを受け入れるか除外するかを判定する関数型インタフェースです。その関数型インタフェースがデータを判定しtrueになったもののみが新たなObservableから通知されるようになります。

このマーブルダイアグラムでは、filterメソッドによって元のObservableから形が丸のデータのみを通知する新たなObservableが生成されることを表しています。まず、上部の左から右にひかれている横の矢印(A)は元のObservableの時間経過(タイムライン)を表し、左から順にデータが通知されることを表しています。そしてそのタイムライン上にある丸や四角などの図形(B)は元のObservableが通知するデータを表し、下に向かう破線の矢印(C)はデータがメソッドを通して処理されるデータの変換やフィルターの様子を表しています。そして中段にある四角に囲まれた枠はメソッド(D)を表し、その下の左から右に伸びる矢印が新たに生成されるObservableのタイムライン(E)になります。そのタイムライン上にあるデータ(F)が新しいObservableから通知されるデータを表しています。そして各タイムラインの右端にある短い縦線(G)は完了を通知することを表しています。
今回のマーブルダイアグラムの例では、様々な形のデータを持つObservableからfilterメソッドを通して、新たに生成されるObservableには形が丸のデータのみを持つようになることを表しています。そして、新しいObservableでは形が丸でないデータの場合は通知されず、そのまま時間が過ぎていき、再び元のObservableが形が丸のデータを通知するまでデータの通知が行われません。そして最後に元のObservableが完了を通知するタイミングで新しいObservableも完了の通知を行うことを表しています。つまり、新しいObservableは形が丸のデータをすぐに通知したり、全ての形が丸のデータを通知してすぐに完了を通知したりするわけではなく、元のObservableがデータや完了を通知するタイミングで初めて新しいObservableも通知を行うことを表しています。
このようにマーブルダイアグラムでは左から右に向かって流れていく時間を表現し、上から下へ向かってデータがメソッドを通って、どのように変換されるかを表現しています。
もし、Observableの処理が完了ではなくエラーを通知する場合は、その個所にバツ印を記述し何らかのエラーが通知されることを示します。例えばonErrorReturnメソッドでは、元となるObservableに何らかのエラーが発生した場合に、引数に渡されたデータを通知し完了するObservableを生成するメソッドです。このような場合、次の図のようにタイムライン上にバツ印を記述しエラーが通知されることを表します。

それでは次に複数のObservableからなる場合のマーブルダイアグラムを見てみましょう。Observableのzipメソッドは複数のObservableからデータを取得し、そのデータを使って生成した新しいデータを持つObservableを生成します。上の2つが元となるObservableのタイムラインで下が結果のObservableのタイムラインです。

元となるObservableが複数の場合、各Observableがデータを通知するタイミングがそれぞれ異なる場合があります。どのタイミングになるかはメソッドによりますがzipメソッドの場合は各Observableの順番を合わせます。今回のマーブルダイアグラムの場合、次のようなデータを生成しています。
- 1番目に通知される「丸」のデータと「灰色」のデータから「灰色の丸」のデータを生成
- 2番目に通知される「四角」のデータと「黒」のデータから「黒の四角」のデータを生成
- 3番目に通知される「菱型」のデータと「白」のデータから「白の菱型」のデータを生成
そのため、結果のObservableが持つデータは後で通知されるObservableのタイミングに合わせて生成されています。
このようにマーブルダイアグラムは時間経過とともにどのような結果が起こるのかの例を表した図がJavaDocを始めとしたドキュメントの多くに使われています。特に複数のタイムラインを扱うものは文章だけだとわかりにくいメソッドもあるのでマーブルダイアグラムを理解しておくと仕様の理解が進みます。また、RxJavaを使ったアプリケーションの設計や独自のObservableを作る際にマーブルダイアグラムを描いてみると、時間の遷移とともに何が起こるのかを図で表現でき、頭の中で考えていたことも可視化され精査できるので、複雑なことをする際はマーブルダイアグラムを描いてみることを勧めします。
まとめ
今回はリアクティブプログラミングとRxJavaの概要について紹介しました。ただ今回はリアクティブプログラミングの概念やRxJavaの概要についてフォーカスしたため、実際にRxJavaを使った実装は行っていません。そのため、今回の記事は物足りなく感じる人もいるかと思います。
しかし、リアクティブプログラミングについて何も知らずにRxJavaを使っても、単にソースコードが複雑になっただけと感じるかもしれません。そのため今回は、実際にサンプルを作成する前に必要となる知識として、リアクティブプログラミングとRxJavaの概要について紹介するのみにしました。
次回は今回の内容を踏まえ、実際にRxJavaを使ったサンプルを作っていきます。