どうもこんばんわ。
あまいろショコラータ2 攻略しました。かぐやちゃんがかわいくてよかった。(振り返り程度の説明だけなので)前作もやろう!!!。
ここのシナリオよすぎる
ここすき。
みつきさんには辛辣なかぐやちゃん!!!!?!?!?
ななちゃん性格のわりに制服かわいいのいい
いちかちゃんルートは必見です!!。ぜひ買って見てみてね
前作ヒロインがいい味出しててほんといい
心の中にちえりちゃんほしい
よかった。おすすすすすめです
本題
今年の夏、体温超えが連発してたけどようやく涼しくなってきたか?久しぶりにエアコン消した。
前回は Kotlin Coroutines のサスペンド関数に関してドキュメントを読んでいきました。
https://takusan.negitoro.dev/posts/amairo_kotlin_coroutines_suspend/
サスペンド関数の知識が必要なので、先に前回の記事を読んでおくことをおすすめします。
今回はFlow
編です。
ちなみにChannel
はほとんど使ったこと無いのでほとんど触れません。。。。ごめんね
環境
今回も今回とてサンプルコードはAndroid
で書きます。
が、特別なことがなければAndroid
じゃないKotlin/JVM
に転用できるはず。
複数回値を返せる Flow
https://kotlinlang.org/docs/flow.html
前回の記事のサスペンド関数では、一回しか値を返すことが出来ませんでした。
複数の値を一回だけ返す分にはPair
やTriple
、それ以上ならデータクラスを作り返せばいいのですが、複数回値を返したい場合はどうでしょう?
Kotlin Coroutines
のFlow
はそれを叶えます!
例えば、センサーの値を返すとか。センサーなら加速度でも気圧でも明るさでも何でもいいんですが、連続して値が来るため、複数回返せるFlow
の出番ですね。
それからWebSocket
でなにかメッセージを受け取るとか。これも複数回にわたってデータを受信するため、これもFlow
の出番です。
Android と Flow
Android
でもよく既に使われていて、例えばRoom
でリアルタイムに値を取得する方法にFlow
が使えます。
LiveData
でも出来ますが。。。
また、DataStore
もFlow
ベースで作られてますね。Key-Value
で変更があったらその都度Flow
を使って最新の状態にしてくれます。
xml
の頃ならコールバックで実装されていたであろう箇所も、Jetpack Compose
ではFlow
で作られていたり。
多分探せばもっとある。
Android
開発的にはLiveData
とやりたいことは大体同じって伝えれば伝わるかな。
他の言語だとなんだろ、それこそRxなんとか
とかsignal
とか?。(どっちも使ったこと無く正直わからない)
JavaScript
だと非同期イテレータ
なのかな。不定期に複数回にわたって値を送信したいという願いは叶ってそう。
最初の Flow
https://kotlinlang.org/docs/flow.html#flows
兎にも角にもなにかFlow
を作ってみましょう。もちろんコールバックからFlow
への変換も出来るのですが、ややこしくなりそうで。
10 回数字を出力するFlow
を作りました。
面白くはないんですが。
logcat
はこんな感じになるはず。
collect { }
はサスペンド関数になっていて、Flow
が終わるまでは一時停止し続けます。collect { }
を複数回使いたい場合はlaunch { }
でその都度囲って上げる必要があります。
いつ終わるのかと言うと、flow { }
の引数、ブロック内(関数の中)で最後まで進むとFlow
が終わります。
emit()
で値を発行出来て、これもサスペンド関数になっています。というかflow { }
の中はサスペンド関数が使えるようになっています。
なのでdelay()
等も呼び出すことが出来ます。ブロック内はサスペンド関数ですが、Flow
自体はサスペンド関数になりません。Flow
を返すだけです。
上記のコードを2つに分けるとしたら、こうなると思います。値を送る側と、値を受信する側。
Kotlinのドキュメントでは、送信側を エミッター(emitter)、受信側を コレクター(collector) と呼んでいるので、今回はこちらに合わせようと思います。
付録 他の言い方
Android
ドキュメントではemitter
をproducer(プロデューサー)
、collector
をconsumer(コンシューマー)
って呼んでてちょっとややこしい。
https://developer.android.com/kotlin/flow
ColdFlow
https://kotlinlang.org/docs/flow.html#flows-are-cold
さて、Flow
を学習するとまずでてくるのが、コールドフローとホットフローの2つ。何も考えずに機械翻訳に投げると流れは冷たい(追真)になって面白い。
コールドフローは、実際に受信が開始されるまで送信側のコードが動かないという特徴があります。
次のコードを試してみましょう。
logcat
はこうなります。
コレクターを呼び出すたびにエミッター側が呼び出されていることが分かります。値はそれぞれ発行されていますね。
コールドフロー
の対になるホットフロー
は、逆にコレクターの存在にかかわらず起動しているFlow
になります。
が、ドキュメントではしばらくコールドフローの話が続くので、ホットフローの話は最後の方に回します。
付録 陥りやすいミス
collect { }
は収集が終わるまで一時停止し続けるため、collect { }
の下に書いたコードは収集が終わるまで呼び出されません。(小泉構文並感)
複数collect { }
したい場合はcollect { }
するたびにlaunch { }
で囲うか(上記のように)、launchIn()
する必要があるのですが、それも後述します。
とりあえずはそれぞれlaunch { }
していきます。
launch がないよ!
前回の記事でlaunch { }
が使えるところ、使えないところ、その理由を話しているのでどうぞ!
https://takusan.negitoro.dev/posts/amairo_kotlin_coroutines_suspend/#構造化された並行性
キャンセル
https://kotlinlang.org/docs/flow.html#flow-cancellation-basics
詳しくは後述するのですが、サスペンド関数のキャンセルと同じ感じでFlow
をキャンセルすることが出来ます。
Flow を作る方法
https://kotlinlang.org/docs/flow.html#flow-builders
正しくはColdFlow
ですね。HotFlow
を作る方法はまた別にあります。
エミッター側を作る方法は、flow { }
以外にもあります。
例えばlistOf()
のようにflowOf()
があります。
配列と変わらないと思うじゃん?後述しますが変換用の演算子(map { }
とか)がサスペンド関数対応なんですよ、、、
あとはasFlow()
で配列等からFlow
に出来ます。
それから、よく使うであろうcallbackFlow { }
。これはコールバックで記述された関数をFlow
に変換する関数です。
例えば以下のコードは明るさセンサーの値をFlow
に変換したものになります。
logcat
はこんな感じになるはず。
最近のスマホ、ベゼルが細いからどこにセンサーがあるのかよく分からん。多分自撮りカメラの部分を手で覆ったりすれば小さい値になると思う。
逆に懐中電灯を向けると大きな値が出てくるはず。
これを書き換えて、例えば頭痛持ちなら気圧センサーの値をFlow
で受け取れるようにしてみるとどうでしょう?
ただ、気圧センサーはフラグシップモデルのスマホにしか搭載されない傾向があるので、(値段重視の)ミドルレンジとかのスマホだと試せないかもしれません。
あとはJetpack Compose
を使ってるならsnapshotFlow { }
でしょうか。
あとはサスペンド関数を一回の値を返すFlow
にすることも出来ます。どこで使うのかは、、、
::
を使うことで、関数型の参照を取得することが出来ます。https://kotlinlang.org/docs/lambdas.html#instantiating-a-function-type
これ以外にも、冒頭で話した通りRoom
のDAO
の返り値にFlow
を採用するとかで作ることが出来ます。
中間演算子で変換する
https://kotlinlang.org/docs/flow.html#intermediate-flow-operators
配列の操作用関数にはfilter { }
やmap { }
、distinctUntilChanged()
なんかがありますが、Kotlin Coroutines Flow
にもあります!
この辺からLiveData
を超え始めます。
これらをなんとなく分かれば、ありとあらゆるものをFlow
基準で考えたくなってくるはず!
filter { }
やmap { }
等は引数の関数がサスペンド関数になっているため、関数内でサスペンド関数を呼び出すことが出来ます!
面白い例が思いつかず申し訳ないのですが、こんな感じにサスペンド関数を呼び出すことが出来ます。
出力はこうです。2 で割れる数が500
ミリ秒ごとに出てくるはず。
複雑な変換をする
https://kotlinlang.org/docs/flow.html#transform-operator
map { }
じゃ足りないですか?transform { }
を使うことで、Flow
から来た値を増やして送信したり、逆に減らして送らないようにすることが出来ます。
例えば無駄ではありますが、受け取った数字の数だけ繰り返し同じ数字を送出するようなコードを。もちろんサスペンド関数なのでもっと面白いことができればよかったのですが。。。
長くなるので出力は途中までしか貼りませんが、出てきた数字の分だけ同じ回数出力されているはずです。
サイズ指定の収集
https://kotlinlang.org/docs/flow.html#size-limiting-operators
配列のそれと同じように、受信する数を決めることが出来ます。
これだけだとあんまり旨味がないかもしれませんが、後述するtoList()
とかで使えるかも!
take()
で上限に達すると、エミッター側であるflow { }
へキャンセルが伝達されます。finally { }
が呼ばれてますね。
付録 捨てる
drop()
で逆に捨てることが出来ます。
例えば、最初の値を捨てたい場合、Jetpack Compose
だとsnapshotFlow { }
を使うと、初期値もFlow
で送りますが、実際に値が変化してからFlow
で受信したい場合があると思います。
そこで、drop(1)
を使うことでmutableIntStateOf()
の初期値は受け取ること無く、その次の値がFlow
から送られてきた場合に収集することが出来るようになります。
末端演算子
https://kotlinlang.org/docs/flow.html#terminal-flow-operators
Flow
から値を受け取るための演算子を末端演算子とか呼んでいるそうです。コンシューマーのことですね。
まずはcollect { }
。これはFlow
から値を受け取り、終わるまで一時停止してくれるやつです。
書き方も2種類くらいあって、まずはcollect { }
ですね、引数の関数で受け取ることが出来ます。
もう一つ、collect()
とonEach { }
を使うパターン。配列操作のonEach { }
のそれと同じですが、onEach { }
には収集を開始する機能が無いため、collect()
で収集を開始するようにします。
どっちも同じ仕事をします。
また、Flow
から1つの値を取り出すための演算子もあります。
first() / first { }
とsingle()
です。また、彼らにはOrNull()
版があります。
first()
は最初の値が来るまで一時停止し、first { }
は引数の関数でtrue
を返すまで一時停止します。true
を返した時点の値を返してくれます。
Flow
でよく使うこれ。これすき。
logcat
はこんな感じになります。first { }
OrNull()
の方を使うと、Flow
で何も値が来ない場合に、例外ではなく変わりにnull
を返してくれるやつです。
こんな感じに、値が来なかった場合はnull
が帰ってきます。
single()
は、値が一回だけ送られてくるFlow
の場合に利用できます。値が来ないで終わった場合と値が複数回流れてきた場合は例外を投げます。
first()
と違って複数回流れて来る場合に使えません。singleOrNull()
で例外の代わりにnull
を返してくれます。
logcat
はこんな感じ。
あとは、toList()
でしょうか。Flow
から受信した値を配列にしてくれます。
take()
なんかで上限を決めることも出来ます。
logcat
はこう
付録 そのほかの演算子
直接Flow
の値には触らない、開始時に呼ばれるonStart { }
や終了時に呼ばれるonCompletion { }
等もあります。またemit()
で値を送信することができるため、
例えば、エミッター側が最初の値を送信するまで時間がかかる場合に、onStart { emit() }
で初期値を即時送信する。なんて事ができます。
あとよく使うのはonEach { }
かな、
値を消費もせず変換もせず、ただ値を傍聴するだけですが、サスペンド関数を呼び出せるので遅くしたり、Flow
の値を確認したいときとかに使えます。
フローは連続的
https://kotlinlang.org/docs/flow.html#flows-are-sequential
今更だとは思いますが、末端演算子は上から順番に適用されますよって話。宣言的でいいですよね。
logcat
を見ると、2 で割り切れない場合はmap { }
に進んでないことが分かりますね。
Flow とコルーチンコンテキスト
https://kotlinlang.org/docs/flow.html#flow-context
collect()
やfirst()
等を呼び出したコルーチンのコルーチンコンテキストで、エミッターから中間演算子の処理がされるという話。
今のスレッド名をログを出してみましょう。
logcat
が長くなってしまったので途中までですが、collect()
をDispatchers.Main
で呼び出したため、main
と表示されます。
Dispatchers.Main
の部分をDispatchers.Default
にすれば別スレッドでFlow
を動かすことが出来ます。
これならmap { }
でインターネット通信を伴う場合も落ちなくなりますね!
withContext が使えない場所
https://kotlinlang.org/docs/flow.html#a-common-pitfall-when-using-withcontext
エミッター側flow { }
のemit()
は、呼び出しスレッドを変更してはいけないルールがあります。(というかコルーチンコンテキストが違ってもダメ?)
emit()
をwithContext
で囲うと怒られてしまいます。
emit()
は元のコルーチンコンテキストから呼び出すか、flowOn()
演算子を使うかのどちらかをする必要があります。
flowOn
https://kotlinlang.org/docs/flow.html#flowon-operator
Flow
を実行するスレッド、もといコルーチンコンテキストを変更するための方法です。
collect()
等のエミッター側をwithContext
で切り替えた中で呼び出す方法もありますが、flowOn()
でも出来ます。
これは巻き上げになります。flowOn()
よりも上のエミッター側と中間演算子が引数のコルーチンコンテキスト
、もといスレッドで呼び出されます。
逆にflowOn()
より下側のコレクター側と中間演算子は、コレクターを呼び出したコルーチンコンテキストで実行されます。
ログを出すようにして、どのスレッドで処理されているかを確認できるようにしてみましょう。
logcat
はこうです。
ちゃんとflowOn
より上は、指定したコルーチンコンテキスト(スレッド)が、それより下はcollect()
を呼び出したコルーチンコンテキスト(スレッド)で処理されている事がわかりますね。
バッファリング
https://kotlinlang.org/docs/flow.html#buffering
Flow
はサスペンド関数を多用するため、しばし時間がかかるサスペンド関数を呼び出してしまう場合があると思います。
助けになるかもしれないbuffer()
例えば以下のコード、エミッター側は値を送信するのに1秒
かかります。また、コレクター側も1秒
かかるとします。
これだとprintln()
まで2秒
かかることが分かりますね。
確かにlogcat
を見ると20秒(20_000 ミリ秒)かかっていることが分かります。
しかし、collect { }
で時間を待っている間に次の値を受信しておく事もできるとは思いませんか?
buffer()
はそれを叶えます。実際に動かさないとわからないと思うのですが、
- 最初の値はもちろん、エミッター側
1秒
+ コレクター側1秒
かかります
- 2つ目以降は、最初のコレクター側
1秒
を待っているの間に、エミッター側1秒
を消費したため、コレクター側1秒
待つだけで出力されるようになります。
logcat
はこうで、最初の値だけ2秒
かかっているので、かかった時間はコレクター側10秒
+エミッター側1秒
になります。
クソ雑な絵です。buffer()
でコレクター側とエミッター側が同時に動くよってことが分かれば。
途中の値は消す
https://kotlinlang.org/docs/flow.html#conflation
コレクター側が時間かかる場合で、かつ毎回処理する必要がない場合に使えます。コレクター側が間に合う分だけ処理すればいいみたいな。
たとえば以下のような、エミッター側は1秒間隔で値を送信し、エミッター側では3秒かかる場合、エミッター側で処理中の値は捨てられることになります。
今回はたまたま1秒毎に値を出していたため、3の倍数がきれいに出ています。
最初と最後は除く。完全にコレクター側の都合だけでオッケーな場合はこれでいいはずです。
値が来たら再起動
https://kotlinlang.org/docs/flow.html#processing-the-latest-value
これだいすき、よく使う。
なんならこれだけで前記事を書いたことがある。https://takusan.negitoro.dev/posts/kotlin_coroutines_flow_latest/
これはコレクター側がまだ処理中の間に、エミッター側から値が来た場合に、コレクター側を一旦キャンセルし、新しい値で再度起動してくれるやつです。
collectLatest { }
の他にもmapLatest { }
やtransformLatest { }
があり、同様に処理中の間に新しい値が来た場合にキャンセルし再起動してくれます。
これは最新の値だけ処理できればいい場合に使います。
例えばcollect { }
した結果から、別のFlow
をcollect { }
したい場合、普通にcollect { }
すると、値が来た分だけ別のFlow
が起動してしまいます。
Flow#collect
の中でFlow#collect
したい場合ですね。
これだと、userListFlow().collect { }
の中でcommentListFlow().collect { }
していますが、commentListFlow()
は無限ループでずっと送信し続けるため、
userListFlow()
から値を受け取ることが出来ません。(collect { }
はFlow
が終わるまで一時停止し続けるので)
そこで、collectLatest { }
です。
これで、userListFlow()
から来た値でcommentListFlow()
を再度作る事ができるようになりました。
再起動できたので、ユーザー更新
のprintln
も動きます!!
transformLatest { }
とかも同様に使うことが出来ます。
collectLatest { }
だと、Jetpack Compose
じゃ使えないですからね(Jetpack Compose
の場合はFlow#collectAsState()
したいので、Flow
を返して貰う必要がある)
ちなみに、おそらくこれをやるための適切な演算子、flatMapConcat { }
等があるんですが、その話は後で!
付録 複数の Flow をまとめる
次のセクションに行く前に触れておこうかと。
↑の例でmerge()
をこっそり使ってたのですが、説明します。
まずはドキュメントでは触れられてないけどmerge()
、これはFlow
の配列を1つのFlow
にすることが出来ます。
こんな感じ。
複数の Flow を組み合わせる
https://kotlinlang.org/docs/flow.html#composing-multiple-flows
次、ドキュメントに戻して、zip
とcombine
ですね。
これらは、それぞれのFlow
から来た値を加工したりなんかして、単一のFlow
にすることが出来ます。
zip vs combine
で調べれば色んな人が図解して説明してくれているので、今更説明するまでもないかなって思ったけどせっかくなので。
zip
https://kotlinlang.org/docs/flow.html#zip
zip
は、それぞれのFlow
から新しい値が出揃った時に出力します。攻略の鍵としては、新しい値が出揃ったといったところでしょうか。
以下のコードを試してみましょう。
Android のリリース年とそれに対応するバージョンをFlow
で出してみる例です。
zip() { a, b -> }
のa
とb
は好きな名前に出来ます。今回は変数の範囲がブロック内、超限定的なので適当にa
とb
にしています。
logcat
の出力結果はこうです。
year = flowOf()
の方では、勢い余って2025
まで作ってしまいましたが、出力結果には2025
がありません。これはなぜかと言うと、2024
まではyear
とandroidVersion
両方から新しい値が送信されていたのですが、
androidVersion
は15
までしか無く、2025
に対応する値がandroidVersion
のFlow
には存在しないためにこのような事態になっています。
先述の通り、zip
はFlow
から新しい値が出揃った時。なので、出揃わない場合は出てくるまで待つことになります。
combine
https://kotlinlang.org/docs/flow.html#combine
もう一つ、combine()
を見てみましょう。
これは、Flow
のどれかから新しい値が出たら、値を送信する事ができます。zip
と違い、全てのFlow
から新しい値が出揃う必要はないです。
例えばバックアップアプリを作ろうとします。バックアップが実行される条件は適当に考えてこちらです。
また、どれか 1 つでも条件が変化したらバックアップも停止してほしいですよね。
というわけでFlow
を使い、バックアップを起動する処理を書いてみましょう(流石にバックアップ処理は書きません)。
まずはそれぞれの状態を通知するFlow
を作ってみました。
充電器に指した、抜いたたtrue / false
を送信するFlow
、Wi-Fi
接続状態に変化があればtrue / false
を送信するFlow
、夜かどうかのFlow
を作り、
すべての条件がtrue
だったらbackupTask()
を呼び出すようにしてみました。
combine()
は 1 つでもFlow
から新しい値が来たら再度関数が呼ばれるため、collectLatest { }
を使いました。
これでfalse
になったときにbackupTask()
をキャンセルできます!
実際に動かして、機内モードをON / OFF
繰り返してみたりすると、キャンセルされた旨が表示されるはずです。
付録 図解
zip()
とcombine()
、もう何人もの人が図解化してるので今更書くまでもないと思いますが一応。
付録 combine() と初期値
上記の絵を描いている時に思ったんですが、例えcombine()
だとしても、Flow
から全て出揃ってないと最初の値は流れてこないんですよね。
それが困る場合があるかなと思います。極端に最初の値が来るのが遅いとか。
その場合はonStart { }
で初期値としてなにか流しておけばいいのかなってちょっと思った。
あとはHotFlow
に変換して常に動かしておくとか。常に動かせば初期値に時間がかかるFlow
でもなんとかなりそう。
Flow の中で Flow を作る
https://kotlinlang.org/docs/flow.html#flattening-flows
collectLatest { }
やtransformLatest { }
でもう既にやったネタですが、、
Flow
で受信した値を元にFlow
を作りたい場合があると思います。Android
だとRoom
のFlow
で受信した値でFlow
を収集したい事がありそう。
map { }
でFlow
を返すと、もれなくFlow<Flow<T>>
とかいうジェネリクス訳わからんことになります。
Flow<Flow<T>>
をFlow<T>
の形にしたいですよね、このままだとcollect { }
でFlow
を受け取る羽目になる。。。
flatMapConcat
https://kotlinlang.org/docs/flow.html#flatmapconcat
これは、flatMapConcat { }
で返したFlow
が終わるのを待つという特徴があります。
そうです。Flow
を返す際はおわりがあるFlow
を返す必要があります。上記のコードではcommentFlowLimit()
が無限ループしないのでおわりがあります。
logcat
はこんな感じで、flatMapConcat { }
で返したFlow
が終わるまではuserFlow()
から受信した値を使わないという特徴があります。
commentFlowLimit()
は5個まで出すので、それが終わってから次の値でflatMapConcat { }
を呼び出しているわけですね。
flatMapMerge
https://kotlinlang.org/docs/flow.html#flatmapmerge
こっちは、値を受け取ったらflatMapMerge { }
を即呼び出し、flatMapMerge { }
で返されたFlow
の収集を始めます。
flatMapConcat { }
と違いFlow
の終了を待たないので、おわりがないFlow
でも使えます。
logcat
はこうで、即呼び出すため、並列で何個もFlow
から受信することになります。
flatMapLatest
https://kotlinlang.org/docs/flow.html#flatmaplatest
Latest
系列は、新しい値が来たらキャンセルしてもう一回起動すると言いました。
例に漏れず、これも新しい値が来たらflatMapLatest { }
で返されたFlow
の収集をキャンセルし、新しく返されたFlow
で収集を初めます。
logcat
の出力はこうで、flatMapConcat { }
と同じような出力がされるのですが、
今回はcommentFlowLimit()
ではなく、無限ループするcommentFlow()
を呼び出しています。新しい値が来たらキャンセルするため、おわりがない場合もキャンセルしてくれるためです。
Flow の例外
https://kotlinlang.org/docs/flow.html#collector-try-and-catch
コレクター側の例外
try-catch
で例外を捕まえることが出来ます。
キャッチできるので、アプリはクラッシュしません。
エミッター側、中間演算子の例外
https://kotlinlang.org/docs/flow.html#everything-is-caught
エミッター側や、中間演算子で発生した例外も、try-catch
でキャッチできます。
キャッチしたため、同様にクラッシュしません。
例外の透過性
https://kotlinlang.org/docs/flow.html#exception-transparency
ここの説明は、catch { }
の後に話すので、まずはcatch { }
について聞いてって
catch 演算子
https://kotlinlang.org/docs/flow.html#transparent-catch
catch { }
を使うことで、try-catch
のように例外をキャッチすることが出来ます。
emit()
も可能です。これも巻き上げなので、catch { }
より下で発生した例外はキャッチされません。宣言型。
コレクター側の例外も catch したい
https://kotlinlang.org/docs/flow.html#catching-declaratively
collect { }
以外に、もう一つ書き方があるといいました。onEach { }
とcollect()
を組み合わせる方法ですね。
これとcatch { }
を使うことで、エミッター側、コレクター側、中間演算子の例外全てをキャッチすることが出来るようになります。
ま、まあcollect { }
とtry-catch
でもいいんですがこういう方法もあるよって。
例外の透明性は何が言いたかったのか
順番が前後しちゃってすいません。
ドキュメントでは、Exception transparency
、翻訳すると例外の透明性
だって、よくわからない。
Flow
で例外が発生した場合に、println()
するような、catch { }
演算子のようなものを自前で作ってみることにしましょう。
しかし、これはcatch { }
と同じ動作をしません。まず、以下のコードを試してみましょう。
catch { }
演算子は巻き上げで、自分より上のエミッターや中間演算子で発生した例外のみをキャッチし、自分より下のコレクターや中間演算子の例外はキャッチしないという特徴がありました。
ただ、自前で作ったcatch
演算子はFlow
全体の例外をキャッチしてしまっています。
map { }
やfilter { }
などは上から順番に処理されるのでなんとなくは予想できますが、全体の例外をキャッチされてしまっては予測が困難になります。
それでは、例外時に値を送信する機能もつけてみましょう。
catch { }
がemit()
できるなら自前で作ったやつだって出来るはず!
しかしこれも動きません。
try-catch
のcatch
でemit()
を呼び出すことは禁止されています。
おそらくは、例外が投げられたことでFlow
は終了するはずだったのに、emit()
されて困っていると言ったことろでしょうか。
(正直良く分かっていない)
自前catch { }
なんて作らずに用意されたcatch { }
を使えば良いです。自分で適当に作ると全体の例外をキャッチしちゃうので。。。
フローの完了
https://kotlinlang.org/docs/flow.html#flow-completion
完了、終了を知ることが出来ます。
try-finally
https://kotlinlang.org/docs/flow.html#imperative-finally-block
サスペンド関数のときと同じくfinally { }
で終わりを知ることが出来ます。
logcat
はこう。まあ予想通り。
onCompletion 演算子
https://kotlinlang.org/docs/flow.html#declarative-handling
onCompletion { }
は、中間演算子の付録で触ったけど、そう言えば言ってないことがあったので。
これもcatch { }
演算子のそれと同じく、finally { }
とだいたい同じです。
ちなみに、onCompletion { }
は、引数にThrowable
を貰えます。
これは例外で終了した場合には例外を、正常に終了した場合はnull
を渡してくれます。
logcat
を見ると、今回は正常に完了したので、null
がでていますね。
catch との違い
https://kotlinlang.org/docs/flow.html#successful-completion
catch { }
と違って、例外をキャッチするわけじゃないので、try-catch
でくくるか、catch { }
をつけるかしないと落ちます。
onCompletion { }
は呼ばれるのですが、その後例外で落ちてしまいます。
logcat
には例外が
例外処理はどっちがいいの
https://kotlinlang.org/docs/flow.html#imperative-versus-declarative
try-catch-finally
と、catch { } onCompletion { }
どっちがいいかという話。
これは、どっちを推奨するとかはないらしい。好きな方を使って大丈夫。
Flow を起動する
https://kotlinlang.org/docs/flow.html#launching-flow
後回しにしていたlaunchIn()
ってのがあるよという話です。
collect()
を呼び出すと、Flow
の収集が完了するまで一時停止し続けるわけです。
間違えやすいミスとしては、collect { }
の後に別のFlow
のcollect { }
をしちゃう場合。先述の通り完了するまで一時停止するため、終わるまで進みません。
終わるのを待たず、並列でFlow
から収集をしたい場合、collect { }
してる箇所をlaunch { }
で一個一個囲っていくか、launchIn()
を使うかです。
まずはlaunch { }
でくくっていく場合。ちょっとネストが深くなっちゃうけど、シンプルでいい。
再度宣伝しますが、launch { }
が使えるところ、使えないところとその理由。を前回のサスペンド関数のドキュメントを読んでみようの記事で触れているので良ければ。
https://takusan.negitoro.dev/posts/amairo_kotlin_coroutines_suspend/#構造化された並行性
話を戻して、もう一つ、launchIn()
を使う方法があります。
これは一時停止する代わりに、Job
を返します。これはlaunch { }
したときの返り値と同じで、これを使うことでキャンセルが出来ます。
(まあコルーチンスコープをキャンセルすることでもキャンセルが出来ます)
logcat
をみるとこんな感じで、一番最後のprintln()
が先に呼び出されるようになります。
Android ライフサイクルと collect
https://developer.android.com/topic/libraries/architecture/coroutines#lifecycle-aware
今まで書いてきたコードでは、アプリを切り替えるなどしてバックグラウンド状態にしてもFlow
から値の収集が動いてしまいます。
ユーザーには見えていないのにFlow
の収集が動くのは、無駄にバッテリーを消費したり、インターネット通信が伴う場合は通信されてしまうため、あんまり良くないですね。見えないところで何やってんだって。電池もギガ(Z世代並感)も有限なので。
Android
では、ユーザーに実際に表示されているときのみFlow
から収集する機能が用意されています。Android
チームが作ってくれました。
もう少し具体的に言うと、ライフサイクルがonStart ~ onStop
の間だけFlow
から収集する方法があります。onStart
まで進んだらFlow
の収集が始まり、onStop
以降に進んだらキャンセルされます。
これを動かしてみて、適当にバックグラウンドいったりフォアグラウンド行ったりするとこんな感じで、何回かfinally !
が出力され、数字も戻っているはず。
なのでlogcat
はこんな感じです。
ちなみに、これのJetpack Compose
版もあります。
build.gradle (.kts)
で、implementation("androidx.lifecycle:lifecycle-viewmodel-compose:2.8.6")
を書き足す必要がありますが。
collectAsState()
をcollectAsStateWithLifecycle()
に置き換えます。
これで画面に表示されているときのみ更新するようになります。
Flow とキャンセルチェック
https://kotlinlang.org/docs/flow.html#flow-cancellation-checks
サスペンド関数のときはensureActive()
やisActive
等でキャンセルチェックをしましょうねという話をしました。
Flow
の場合もだいたい同じで、キャンセルを考慮する必要が多々あります。
まずはflow { }
で作ったFlow
。これはキャンセル可能です。キャンセルしたらもう来ません。
試してみましょうっておもったけど、これもAndroid
だとサンプルコードがそのままクラッシュせずに動いちゃいますね。
じゃあ飛ばして、asFlow()
等で作ったFlow
はキャンセルされてても値が来てしまいます。
もちろん、中間演算子や、コレクター側でキャンセル対応サスペンド関数を呼び出せばそこでキャンセルされるのですが(delay()
やwithContext { }
)、それすらもない、最小構成の場合はキャンセル後も値が出てきます。
もしこれをキャンセル対応にしたい場合は、cancellable()
をつけると良いです。
これで、3
以降の値は来ていないことが分かりました。
むすび
https://kotlinlang.org/docs/flow.html#flow-and-reactive-streams
Flow
はRxなんとか
等に似せて作ったよって話と、Rx
と変換するライブラリも用意したよって話で終わりです。
公式ドキュメントは以上!長かった・・・
ちょっとまって? HotFlow の話が入ってないやん
親方に電話させてもらうね
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-shared-flow/
はい、なぜだか知りませんがホットフローに関してさっきの公式ドキュメントでは触れられていません。ので手探りでやっていこうと思います。
ちなみに一応Android
側にはあります。良くも悪くもGoogle
が作ったって感じのドキュメントですが。
https://developer.android.com/kotlin/flow/stateflow-and-sharedflow
ColdFlow だと困ること
https://elizarov.medium.com/shared-flows-broadcast-channels-899b675e805c
ここまで作ってきたFlow
はFlow
の中でもCold Flow (コールドフロー)
と呼ばれるものです。
flow { }
のブロック内で値を送信する処理が完結しています。
これにより、collect()
でブロック内の処理が開始し、ブロック内一番最後に到達すればcollect()
の一時停止が終了するわけです。
ブロック内に閉じ込められているため、開始と終了は明確ですね。
ところで、flow { }
のブロック内よりも広いスコープでemit()
したい場合はどうすればいいでしょうか。
上記の例では終わりがあるのため、ブロック内でしかemit()
出来ないですが、これとは別に終わりがないFlow
が存在すれば他の箇所からemit()
出来てもいいのではないかと。
え?emit()
を他の箇所からしたい理由?それはcollect()
するたびにflow { }
のブロック内が呼び出されると無駄使いになるから。
インターネット通信が伴う場合、一回flow { }
を起動したら 2 回目以降はそれを使いまわしてほしいと思うでしょう。でもブロック内だと毎回呼ばれちゃうから。emit()
するコードを動かしたいわけ。
要はLiveData
の代わりに使えるって、皆言ってたのに今のところ出てきてねえじゃねえかって。
HotFlow
SharedFlow
とStateFlow
がそれらを解決します。
これらは、終わりがない代わりに、emit()
が自由にできます。ブロック内の成約がなくなります。
難しいですか、じゃあMutableLiveData / LiveData
はMutableStateFlow / StateFlow
に置き換えが可能ですと言えばいいかな。使い方はほぼ同じでFlow
の強力な中間演算子付きです。
ColdFlow vs HotFlow
HotFlow は終了しない
ColdFlow
はcollect()
したら起動し、ブロック内の処理が終わると終了します。collect()
も一時停止が解除されます。
HotFlow
は作った時点で起動し、終わりません。ColdFlow
と違い終わりを知るすべが無いためですね。終わりがないため、collect()
したらそれ以降の処理が呼ばれません。
先述の通り、launch { }
でくくるなどの対策が必要です。
また、toList()
等の終了する前提で作られたコレクターもおそらく期待通りに動きません。終わるまでList
に詰めてくれるやつですが、終わらないんですから。
一応toList()
はtake()
を使い上限を決めることで期待通りに動くようになります。
値は共有される
ColdFlow
はcollect()
のたびにブロック内が起動するため、それぞれ独立していました。そのため、値は共有されません。それぞれで作られます。
HotFlow
はemit()
した値を全てのコレクター側で共有します。同じものが届けられます。
HotFlow の使い方
SharedFlow
もStateFlow
もMutable
版があります。
Mutable
版を作った後、Jetpack Compose / Activity / Fragment
から参照するための非Mutable
版のFlow
をpublic
で公開するのがお作法です。
これによりHotFlow
への書き込みはViewModel
内に限定されるため秩序が保たれます。
まんまLiveData
のそれなのですが、1 つ疑問が
SharedFlow vs StateFlow
短い答えとしては、LiveData
の代替はStateFlow
になります。
真面目に話すと、Flow
、SharedFlow
、StateFlow
の関係性は以下のようになります。
はい。StateFlow
はSharedFlow
を元に作ったものですね。これだけだと進化版に見えますが、使い所さんが違います。
まずはSharedFlow
。これはHotFlow
ですが、LiveData
のように最新の値を保持するわけではありません(正しくは保持できるがデフォルトだとしない。引数を調整するか、StateFlow
を使う)。
そのため、collect()
するタイミングによって、受け取れる値が変わります。以下の例を試してみましょう。
logcat
はこうで、collect()
した後に送られてきた値のみ受信するという挙動になっています。
[2]
がなぜ1
を受信していないかと言うと、collect()
する前にemit(1)
されたからですね。名前通り値を共有するのには使える感じですね。
次にStateFlow
もみてみましょう。
こちらは初期値が必要なんです。状態更新はemit()
もありますが、.value
でセットするか、update { }
で出来ます。これらは非サスペンド関数からも呼び出せます。
logcat
の出力がこんな感じで、今回は[2]
でも1
を出力しています。
これは、SharedFlow
とは違いStateFlow
の場合は最後の値(つまり最新の値)を常に持っているためです。最後の値が必要なので、MutableStateFlow()
を作る際には初期値が必要になります。
LiveData
と違いKotlin
で書かれているため、null
にする場合は?
を型に付ける必要があります。安全!
もちろんSharedFlow
でも保持するように引数を調整できるのですが、あんまりやらないかなと、、、
StateFlow
あるしそれでいいじゃんて
あとはStateFlow
は同じ値を入れた場合は送信しないという特徴もあります。
distinctUntilChanged()
が組み込まれているため、StateFlow
にdistinctUntilChanged()
をつけても意味がないです(map { }
等で変換した後では効くと思います)
ColdFlow を HotFlow にしたい
ColdFlow
でインターネット通信をしたいけど、その都度起動すると通信量がその分だけかかってしまいます。値を共有できるSharedFlow / StateFlow
にしたいけど書き直すのも面倒だって?
flow { }
やasFlow()
で作ったFlow
をHotFlow
に変換する関数があります。これでMutableStateFlow()
用に書き直す必要はありません。
まずはColdFlow
で書いたFlow
を 2 箇所で受信して動かしてみる。
せっかくなのでインターネット通信をするFlow
を作ります。インターネット通信用ライブラリのOkHttp
を入れて、android.permission.INTERNET
権限を付与して、以下のコードです。
ちなみに叩いてるAPI
はAWS
のグローバルIP確認API
です。バックエンド開発ならまだしも、Android
でグローバルIPを知れて嬉しいことなんて無いと思いますが。
logcat
を見てみると、ColdFlow
の特徴通り、collect()
するたびにブロック内の処理が起動してしまっているので("Flow 起動"
が2回出ている)、インターネット通信を余計にしていることになりますね、無駄!
一回取得したらcollect { }
間で共有してほしいですよね。
HotFlow
にするにはstateIn()
かshareIn()
を呼び出すとよいです。
stateIn()
がStateFlow
、shareIn()
がSharedFlow
になります。常に最新の値を持ってほしい場合はstateIn()
、ただ共有だけできればいい場合はshareIn()
でいいんじゃないかなと。
付録 SharingStarted の種類
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-sharing-started/-companion/
- Eagerly
stateIn
/shareIn
を呼び出すとすぐに動き出します
- Lazily
- WhileSubscribed
- 誰かが
collect()
するまで動かないし、collect()
している箇所がなくなったら終了する。誰かが受信している間は動く
- 自作する
付録 HotFlow 好きな使い方
好きなHotFlow
変換ユースケース発表ドラゴン
個人的にはコールバックのAPI
をcallbackFlow { }
でFlow
にした後、stateIn()
するのがすきです。
というのも、コールバックのAPIってのは大抵、コールバックの呼び出し以外で値を取得する術がないんですよね。
うーん、説明すると難しいな。コールバックを待ってからじゃないと値が取れないのはもちろんなんだけど、コールバックの引数ってコールバック以外で取れないんですよね。
getSuccessValueOrNull()
みたいな、前回成功したコールバックの値を返す関数が生えていてほしいんだけど、大抵は無い。
擬似コードを無理やり書いて、しかも待ち合わせを一定時間待つことでやっているガチで良くないコードですが、こういうのが欲しい
で、これをstateIn()
するとどうなるかと言うと、最後の値を常に持っていてくれるため、処理が終わるまで待ちたければcollect()
するし、確実に非同期処理が終わっていれば.value
プロパティで取得できるので、私はこのパターンを良く使ってます。
Camera2 API
はこの技を使うとずいぶんコールバックの数がマシになる。
まあ後は、最新の値を持ってくれているという特徴を活かして、いつ呼ばれるか分からんコールバックをcallbackFlow { }
とstateIn()
するやつ。
例えば最初の値が来るまで時間がかかっていつ呼ばれるか分からん非同期処理なんかは、stateIn()
で即起動しておく技をやっています。
完了すればcollect { }
で受信できるし、.value
でも取れるしで。Flow.first()
で非同期処理の値が来るまで一時停止する技もすきです。
まあ後は(オイいい加減にしろ)、コールバックの登録が一箇所しか出来ないやつもcallbackFlow { }
とstateIn()
で変換して、複数個所から受信できるようにするやつ。これもすき
ライブラリによっては setListener { } / setCallback { } じゃなくて、 addListener { } / addCallback { } を用意してくれるライブラリもあるんですが、大体はsetListener { } / setCallback { }
で一箇所しかコールバックを登録できない。。。ので。
Channel
https://kotlinlang.org/docs/channels.html
すいません、これはマジでほとんど使ったことがなくドキュメント通りの話しかできません。
今のところFlow
で間に合ってるので使わないかなあ
これは、複数回値を返したいというよりかは、launch { }
を超えた値の共有のために使われてそう?
コルーチン間で値のやり取りができます。Flow
でもできますが、これは値が送れたことが確実になるまで一時停止してくれます。BlockingQueue
のコルーチン版らしい。
例えば以下のコードを試してみましょう。
logcat
はこうです。send()
したら、receive()
するまで一時停止する感じです。
これはFlow
とは違い、確実に受信したかを確認することが出来ます。
コルーチン間で処理が分かれていて、別のコルーチンに処理を投げたい場合にChannel
は便利なんだと思います。受信が確実に分かるので。
ごめん、使ったことがなくどういう時に使えばいいかわかんないや
おわりに
以上です。おつかれさまでした。88888