Observable
のリストの結果からリアルタイムでを生成したいFutures
。
最も単純なケースでは、 で実行している先物のリストがあり、Future.sequence
で進行状況を監視しているとObservable
します。私は基本的に次のようにしています:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}
これは私のテスト環境では問題なく動作します。onNext
しかし、少なくとも重複する呼び出しがないことに注意せずに、異なるスレッドから呼び出すべきではないことを読んだところです。これを修正するための推奨される方法は何ですか? の多くの実際の使用でObservables
は、このような非同期コードから呼び出す必要onNext
があるようですが、ドキュメントで同様の例を見つけることができません。