3

何らかの理由で、私の Akka ストリームは、最初のメッセージを「発行」(?) する前に、常に 2 番目のメッセージを待ちます。

これが私の問題を示すサンプルコードです。

val rx = Source((1 to 100).toStream.map { t =>
  Thread.sleep(1000)
  println(s"doing $t")
  t
})
rx.runForeach(println)

出力が得られます:

doing 1
doing 2
1
doing 3
2
doing 4
3
doing 5
4
doing 6
5
...

私が欲しいもの:

doing 1
1
doing 2
2
doing 3
3
doing 4
4
doing 5
5
doing 6
6
...
4

2 に答える 2

5

コードが現在設定されている方法では、Source要素を下流に放出し始めることが許可される前に、 を完全に変換しています。toStreamソースを表す数値の範囲を削除することで、(@slouc が述べたように) その動作を明確に確認できます。Sourceこれを行うと、ダウンストリームの要求に応答し始める前に、最初に が完全に変換されることがわかります。Source実際に aを aに実行し、途中で変換ステップを実行したい場合はSink、次のような構造を試すことができます。

val transform =
  Flow[Int].map{ t =>
    Thread.sleep(1000)
    println(s"doing $t")
    t
  }

Source((1 to 100).toStream).
  via(transform ).
  to(Sink.foreach(println)).
  run

この変更を行うと、目的の効果が得られます。つまり、下流に流れる要素は、次の要素の処理が開始される前に、フロー全体で処理されます。

于 2016-07-10T19:06:56.550 に答える
1

.toStream()コレクション全体が遅延していることを意味します。それがなければ、出力は最初の 100 の「実行中」の後に 1 から 100 までの数字が続きます。しかし、Stream最初の要素のみを評価し、「実行中 1」の出力を与え、そこで停止します。次の要素は、必要に応じて評価されます。

現在、ドキュメントでこれに関する詳細を見つけることができませんでしたがrunForeach、現在の要素で関数を呼び出す前に次の要素を取る実装があると思います。そのため、要素 nを呼び出す前にprintln、まず要素 n+1 を調べます (たとえば、存在するかどうかを確認します)。その結果、"doing n+1" メッセージが表示されます。次にprintln、現在の要素で関数を実行し、メッセージ "n" を生成します。

あなたは本当にあなたのmap()前にいる必要がありますrunForeachか?つまり、データを 2 回移動する必要がありますか? 私はおそらく明白なことを述べていることを知っていますが、データを一度に処理するだけなら、次のようになります。

val rx = Source((1 to 100).toStream)
rx.runForeach({ t =>
  Thread.sleep(1000)
  println(s"doing $t")
  // do something with 't', which is now equal to what "doing" says
})

そうすれば、いつ評価されるかという問題はありません。

于 2016-07-10T18:32:38.720 に答える