以前(ずいぶんと前だけど)に,RxJavaについてのエントリで,「RxJavaに Promise.all()
が欲しい」という話をしました.この時に紹介したRxJavaのconcatMapEager
は,当時はExperimentalでしたが現在はStableになっています.
今回はRxJavaではなくRxJSについてです.
先のエントリの中で,私が求めいたのは
- 処理が並行して実行される
- 順番が保存されている
という特徴を持ったオペレータでした.そして最近,RxJSのforkJoin
を知りました.
RxJSのforkJoin
RxJSにforkJoin
というAPIがあります.これはObservableのリストを受け取り,それらが最後にemitした値のリストのObservableをつくります.
例えば
forkJoin(
of('君がッ', '泣くまで'),
of('殴るのを', 'やめないッ!'),
).subscribe(console.log); // ["泣くまで", "やめないッ!"]
という具合です.forkJoin(...)
はPromise.all(...)
のObservable版なのです.
次に,こんなモノを用意しました.
const rand = () => Math.floor(Math.random() * 3) * 1000;
const delayedEcho = (message) => of(message).pipe(delay(rand()));
ランダム時間後にメッセージをそのまま返してくれる,その名もdelayedEcho
です.
const ids = ['1', '2', '3', '4', '5'];
const observables = ids.map(delayedEcho);
forkJoin(observables).subscribe(console.log);
// ["1", "2", "3", "4", "5"]
はい,完全に思い通りに動いてくれました.
xxxMapなオペレータ達
ここから先はおまけです.先ほどのdelayedEcho
で遊んでみます.
mergeMap
from(ids)
.pipe(
mergeMap(delayedEcho),
toArray()
)
.subscribe(console.log);
// ["3", "5", "1", "2", "4"]
mergeMap
はflatMap
です (flatMap
がmergeMap
のエイリアス).処理は並行に走りますが,購読される値は先着順です.
concatMap
from(ids)
.pipe(
concatMap(delayedEcho),
toArray()
)
.subscribe(console.log);
// ["1", "2", "3", "4", "5"]
concatMap
は,処理が直列です.現在の処理が完了するまで次の処理を開始しません.したがって,上記の例はmergeMap
の平均およそ5倍の時間が掛かります.
switchMap
from(ids)
.pipe(
switchMap(delayedEcho),
toArray()
)
.subscribe(console.log);
// ["5"]
switchMap
は最新のObservableに関心を切り替えます.
ここで,delayedEcho
に少し小細工をします.
const delayedEcho = (message) => of(message)
.pipe(
tap(console.log), // <= これを追加
delay(rand())
);
そして先ほどのswitchMap
のコードをもう一度実行すると…
from(ids)
.pipe(
switchMap(delayedEcho),
toArray()
)
.subscribe(console.log);
// "1"
// "2"
// "3"
// "4"
// "5"
// ["5"]
"1"
から"4"
は無視されたかと思いきや,全てのObservableの処理は実行を開始していたことがわかります.
exhaustMap
最後がexhaustMap
です.このオペレータは重要です.
from(ids)
.pipe(
exhaustMap(delayedEcho),
toArray()
)
.subscribe(console.log);
// "1"
// ["1"]
exhaustMap
は,前の処理が完了していなかい場合には後続の処理を無視します.この挙動は,非同期処理が不整合を起こさないための様々なケースに使えます.