私はMonix 3このコードを使用しており、次のようなコードを持っています:
Observable.fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
.flatMap(i =>
if (i % 2 == 0) { // Bad i
Observable.empty
} else
Observable.pure(i)
)
.foreachL(i => print(s"Good i: $i")) /*Output: Good i: 1
Good i: 3
Good i: 5
Good i: 7
Good i: 9*/
このコードは問題なく動作しますが、長時間の IO 操作が多数あるため、次のようにリファクタリングすることにしました.mapParallelUnordered。
Observable.fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
.mapParallelOrdered(3)(i =>
if (i % 2 == 0) {
Task.raiseError(new Exception(s"Bad i: $i"))
} else
Task.pure(i)
)
.foreachL(i => print(s"Good i: $i")) /*Output: Good i: 1*/
最初の例と同じ結果を取得しようとしていますが、並列処理を行っています。問題はTask.raiseError、オブザーバブル全体を殺すため、 で停止しi = 2ます。
エラーを処理し、Observable を存続させる方法は?