編集:
私が知る限り、あなたの解決策は正しいです。パフォーマンスを少し改善したい場合は、次のことを検討してください。
val observableOut: Observable[Out] = observableIn.flatMap { in =>
val p = Promise.successful(in)
Observable.from(futureInToFutureOut(p.future))
}
Future.apply
のように未来をマッピングするための非同期計算を作成しないため、これは少し高速です。
年:
古い提案を以下に残します。これは、Observable
.
import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
promiseIn.future.map(futureInToFutureOut).foreach { y =>
observer.onNext(y)
observer.onCompleted()
}
}
説明
オブジェクト (つまり、イベント ストリーム)から始めているので、そこからイベントを未来Observable[In]
に転送する方法を見つける必要があります。Observable
新しいフューチャを作成する一般的な方法は、フューチャ オブジェクトの入力側であるPromise
最初のフューチャを作成することです。次にon を使用して、最初のイベントが到着したときに futureを呼び出します。foreach
Observable
trySuccess
observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)
のイベントがObservable
到着すると、Promise は非同期的に完了します。これで、プロミスの読み取り側、つまりfuture
メソッドを呼び出すことで未来を取得できます。そしてmap
未来を呼ぶ―― promiseIn.future.map(futureInToFutureOut)
。グラフィカルに:
promiseIn.future ---x---> map ---f(x)---> futureOut
結果のフューチャは で非同期に完了しfutureInToFutureOut(x)
ます。この時点で、 を介してこの値を返す方法を見つける必要がありますObservable[Out]
。new を作成する一般的な方法は、ファクトリ メソッドObservable
を呼び出すことです。Observable.create
このメソッドは、Observable
の書き込み終了として、Observer
を呼び出してイベントを発行するために使用する を提供しますonNext
。
futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))
future は最大で 1 つのイベントしか発行しないことがわかっているためonCompleted
、オブザーバーで を呼び出して output を閉じますObservable
。
編集: Rx と Scala の将来をマスターしたい場合は、これらのトピックを扱っているこの本を検討することをお勧めします。免責事項: 私は著者です。