3

私が達成しようとしているのは、akka ストリームを使用した同期フィードバック ループのようなものを実装することです。

を持っているとしましょうFlow[Int].filter(_ % 5 == 0)。のストリームIntをこのフローにブロードキャストし、タプルをその直後に圧縮すると、次のような結果が得られます

(0,0)
(5,1)
(10,2)

Option[Int]次の要素をプッシュした後にフローが要素を放出したかどうかを示すを放出する方法はありますか?

(Some(0),0)
(None, 1)
(None, 2)
(None, 3)
(None, 4)
(Some(5), 5)
(None, 6)
...

DetachedStageフローが前Flowの段階で引っ張られるたびに、私は彼が次の要素を必要とすることを知っていました。背後のステージが要素を受け取らなかった場合、それは None でした。

残念ながら、結果は良くなく、多くの順位がずれています。

補足事項

フィルタ フローは単なる例です。これは非常に長いフローになる可能性があり、そのフローのOptionすべての段階で を発行する機能を提供することはできません。そのため、フローが次をプッシュしたかどうかを実際に知る必要があります。代わりにダウンストリームから次をリクエストしました

と も試してみましたがconflateexpandこれらは結果の位置オフセットでさらに悪化しています

構成で変更したことの 1 つは、フローのinitialandmaxバッファーでした。これにより、示された要求が実際にプッシュした要素の後にあることを確認できます。

この問題を解決する方法についていくつかの提案をいただければ幸いです。

4

1 に答える 1