scalaz-stream を使用してストリームを分割/フォークしてから再結合することは可能ですか?
例として、次の関数があるとしましょう
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add)
zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )
scalaz-stream を使用すると、この例では期待どおりの結果が得られます - 1 から 10 までの数値のタプルがシンクに渡されます。
streamOfNumbers
ただし、 IO を必要とするものに置き換えると、実際には IO アクションが 2 回実行されます。
を使用するTopic
と、ストリーム内の要素を正しく複製する pub/sub プロセスを作成できますが、バッファリングは行われません。シンクが消費するペースに関係なく、ソース全体をできるだけ速く消費するだけです。
これを制限された Queue でラップすることはできますが、最終結果は必要以上に複雑に感じられます。
ソースから IO アクションを複製せずに scalaz-stream でストリームを分割する簡単な方法はありますか?