0

eaを処理したいです。クエリ フェッチ (クエリごとに複数のフェッチが行われる可能性があります) を非同期的に行います。これを行うために、処理関数 ( a を返すFuture) をクエリ メソッドに渡して、それを呼び出します。フェッチ。クエリの結果サイズは事前にわかりません。フェッチの最大サイズしか知りません。したがって、私のクエリは(事前にサイズを知る必要がObservableある for ex. とは対照的に) を返します。List唯一の問題は、Observable createorを使用すると、次の onNext を呼び出す前にapply、 my が完了するまで内部的にブロックされることです。事実上、先物から得たいと思っていたパフォーマンスの向上が失われます。Futureファクトリ メソッドはObservable fromブロックしませんが、Iterable. 私はそれをミュータブルに渡すことができますIterable新しいフェッチが入ると成長します。誰かがより参照透過的なsol'nを持っていますか? コードは次のとおりです。

object Repository {
  def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Future[Set[Int]]] = {
    // observable (as opposed to list) because modeling a process 
    // where the total result size is unknown beforehand. 
    // Also, not creating or applying because it blocks the futures
    val mut = scala.collection.mutable.Set[Future[Set[Int]]]()
    val obs = Observable.from(mut)
    1 to 2100 by fetchSize foreach { i =>
      mut += f(DataSource.fetch(i, fetchSize))
    }
    obs
  }
}
4

1 に答える 1

0

foldLeft を使用して可変性を取り除くことができました。

(1 to 21 by fetchSize).foldLeft(Observable just Future((Set[Int]()))) { (obs, i) =>
  obs + f(DataSource.fetch(i)())
}

どこ:

implicit class FutureObservable(obs: Observable[Future[Set[Int]]]) {
  def +(future: Future[Set[Int]]) =
  obs merge (Observable just future)
}

Observable唯一のことは、コンパイラが不満を言わなかった空の作成をしなければならなかったことが好きではないということです。誰かがより良い答えを持っている場合は、投稿してください。マークします。

于 2015-11-22T00:52:07.470 に答える