3

type の要素のイベント ストリームがあるとしますIn

val observableIn: Observable[In] = ???

Intype のオブジェクトを type のオブジェクトに変換する関数ですOutが、「将来的には」:

val futureInToFutureOut: (Future[In]) => Future[Out] = ???

この時点で、observableInmy function に従ってmy の要素を変換したいと思いますfutureInToFutureOut。つまり、結果としてOut、元のストリームの要素と一致する type の要素のイベント ストリームが必要ですが、 function を介して変換されますfutureInToFutureOut

私はこれがうまくいくと思います:

val observableOut: Observable[Out] = observableIn flatMap { in =>
  Observable.from(futureInToFutureOut(Future(in)))
}

これは正しいですか?これを行うより良い方法はありますか?

4

1 に答える 1

2

編集:

私が知る限り、あなたの解決策は正しいです。パフォーマンスを少し改善したい場合は、次のことを検討してください。

val observableOut: Observable[Out] = observableIn.flatMap { in =>
  val p = Promise.successful(in)
  Observable.from(futureInToFutureOut(p.future))
}

Future.applyのように未来をマッピングするための非同期計算を作成しないため、これは少し高速です。

年:

古い提案を以下に残します。これは、Observable.

import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
  promiseIn.future.map(futureInToFutureOut).foreach { y =>
    observer.onNext(y)
    observer.onCompleted()
  }
}

説明

オブジェクト (つまり、イベント ストリーム)から始めているので、そこからイベントを未来Observable[In]に転送する方法を見つける必要があります。Observable新しいフューチャを作成する一般的な方法は、フューチャ オブジェクトの入力側であるPromise最初のフューチャを作成することです。次にon を使用して、最初のイベントが到着したときに futureを呼び出します。foreachObservabletrySuccess

observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)

のイベントがObservable到着すると、Promise は非同期的に完了します。これで、プロミスの読み取り側、つまりfutureメソッドを呼び出すことで未来を取得できます。そしてmap未来を呼ぶ―― promiseIn.future.map(futureInToFutureOut)。グラフィカルに:

promiseIn.future ---x---> map ---f(x)---> futureOut

結果のフューチャは で非同期に完了しfutureInToFutureOut(x)ます。この時点で、 を介してこの値を返す方法を見つける必要がありますObservable[Out]。new を作成する一般的な方法は、ファクトリ メソッドObservableを呼び出すことです。Observable.createこのメソッドは、Observableの書き込み終了として、Observerを呼び出してイベントを発行するために使用する を提供しますonNext

futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))

future は最大で 1 つのイベントしか発行しないことがわかっているためonCompleted、オブザーバーで を呼び出して output を閉じますObservable

編集: Rx と Scala の将来をマスターしたい場合は、これらのトピックを扱っているこの本を検討することをお勧めします。免責事項: 私は著者です。

于 2015-07-13T18:14:59.410 に答える