5

Observableのリストの結果からリアルタイムでを生成したいFutures

最も単純なケースでは、 で実行している先物のリストがあり、Future.sequenceで進行状況を監視しているとObservableします。私は基本的に次のようにしています:

  def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
    Observable[String](observer => {
      val loudFutures: List[Future[Int]] = futs.map(f => {
            f onComplete {
              case Success(a) => observer.onNext(s"just did $a more")
              case Failure(e) => observer.onError(e)
            }
            f
        })
      Future.sequence(loudFutures) onComplete {
          case Success(_) => observer.onCompleted()
          case Failure(e) => observer.onError(e)
      }
    })
  }

これは私のテスト環境では問題なく動作します。onNextしかし、少なくとも重複する呼び出しがないことに注意せずに、異なるスレッドから呼び出すべきではないことを読んだところです。これを修正するための推奨される方法は何ですか? の多くの実際の使用でObservablesは、このような非同期コードから呼び出す必要onNextがあるようですが、ドキュメントで同様の例を見つけることができません。

4

1 に答える 1

1

観察可能なコントラクト

オブザーバブルは、オブザーバーに連続して (並列ではなく) 通知を発行する必要があります。これらの通知は異なるスレッドから発行される場合がありますが、通知間に正式な事前発生関係が存在する必要があります。

シリアライズを見てみよう

Observable がそのオブザーバーのメソッドを非同期的に (おそらく異なるスレッドから) 呼び出すことができます。これにより、そのような Observable が Observable 契約に違反する可能性があり、OnNext 通知の 1 つの前に OnCompleted または OnError 通知を送信しようとしたり、2 つの異なるスレッドから OnNext 通知を同時に送信したりする可能性があります。Serialize オペレーターを適用することで、そのような Observable を適切に動作させ、同期させることができます。

于 2016-03-13T02:28:38.550 に答える