私が達成しようとしているのは、akka ストリームを使用した同期フィードバック ループのようなものを実装することです。
を持っているとしましょうFlow[Int].filter(_ % 5 == 0)
。のストリームInt
をこのフローにブロードキャストし、タプルをその直後に圧縮すると、次のような結果が得られます
(0,0)
(5,1)
(10,2)
Option[Int]
次の要素をプッシュした後にフローが要素を放出したかどうかを示すを放出する方法はありますか?
(Some(0),0)
(None, 1)
(None, 2)
(None, 3)
(None, 4)
(Some(5), 5)
(None, 6)
...
DetachedStage
フローが前Flow
の段階で引っ張られるたびに、私は彼が次の要素を必要とすることを知っていました。背後のステージが要素を受け取らなかった場合、それは None でした。
残念ながら、結果は良くなく、多くの順位がずれています。
補足事項
フィルタ フローは単なる例です。これは非常に長いフローになる可能性があり、そのフローのOption
すべての段階で を発行する機能を提供することはできません。そのため、フローが次をプッシュしたかどうかを実際に知る必要があります。代わりにダウンストリームから次をリクエストしました
と も試してみましたがconflate
、expand
これらは結果の位置オフセットでさらに悪化しています
構成で変更したことの 1 つは、フローのinitial
andmax
バッファーでした。これにより、示された要求が実際にプッシュした要素の後にあることを確認できます。
この問題を解決する方法についていくつかの提案をいただければ幸いです。