14

Stream[X]と を組み合わせてにfunction X => Future Yしたい状況に陥ることがありますがFuture[Stream[Y]]、それを行う方法が見つからないようです。たとえば、私は

val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)

val result : Future[Stream[String]] = ???

私は試した

 val result = Future.Traverse(x, toFutureString)

これは正しい結果をもたらしますが、Future を返す前にストリーム全体を消費しているように見えます。

私は試した

val result = x.flatMap(toFutureString)

しかし、それはコンパイルされませんtype mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]

val result = x.map(toFutureString)

やや奇妙で役に立たないStream[Future[String]]

問題を解決するにはどうすればよいですか?

編集:私は にこだわっていません。ヘッドの処理を開始する前にすべてのアイテムの評価をブロックしない限り、 でStream同じ操作を行うことにも同様に満足しています。Iterator

Edit2: Future.Traverse コンストラクトが Future[Stream] を返す前にストリーム全体をトラバースする必要があることを 100% 確信しているわけではありませんが、そうなっていると思います。そうでない場合、それ自体は素晴らしい答えです。

Edit3: また、結果が順序どおりである必要はありません。返されたストリームまたはイテレータがどのような順序であっても問題ありません。

4

3 に答える 3

9

で正しい軌道に乗っていますtraverseが、残念ながら、この場合、標準ライブラリの定義が少し壊れているようです。戻る前にストリームを消費する必要はありません。

Future.traverse「トラバース可能な」タイプにラップされた任意のアプリケーションファンクターで機能する、より一般的な関数の特定のバージョンです(たとえば、これらの 論文または詳細については、こちらの回答を参照してください)。

Scalazライブラリはこのより一般的なバージョンを提供し、この場合は期待どおりに動作します ( Futurefromのアプリケーション ファンクター インスタンスを取得していることに注意してくださいscalaz-contrib。これは、まだ Scala 2.9 に対してクロスビルドされている Scalaz の安定バージョンにはまだ含まれていません)。 .2、これはありませんFuture):

import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._

import ExecutionContext.Implicits.global

def toFutureString(value: Int) = Future(value.toString)

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString

これは無限ストリームですぐに返されるため、最初に消費されていないことが確実にわかります。


脚注として:のソースFuture.traverse見ると、 の観点から実装されていることがわかります。foldLeftこれは便利ですが、ストリームの場合は必要または適切ではありません。

于 2013-08-04T15:14:15.837 に答える
0

現在のバージョンの Scalaztraverse()は動作が異なり、呼び出し時にストリーム全体を消費しようとするため、受け入れられた回答はもはや有効ではありません。

質問に関しては、これを真にノンブロッキングな方法で実現することは不可能だと思います。

Future[Stream[Y]]が利用可能になるまで解決できませんStream[Y]。また、は、トラバースするときにブロックしないと取得できないY関数によって非同期に生成されるためです。これは、解決する前にすべてを解決する必要がある (ストリーム全体を消費する必要がある) か、トラバース中にブロックが発生することを許可する必要がある (基礎となる先物がまだ完了していないアイテムで) ことを意味します。しかし、トラバースのブロックを許可すると、結果として生じる未来の完了の定義はどうなるでしょうか? その観点からすると、 と同じかもしれません。これは意味的には元の と同じです。X => Future[Y]YStream[Y]Future[Y]Future[Stream[Y]]Stream[Y]Future.successful(BlockingStream[Y])Stream[Future[Y]]

つまり、質問自体に問題があると思います。

于 2018-12-13T12:30:29.280 に答える