8

scalaz-stream を使用してストリームを分割/フォークしてから再結合することは可能ですか?

例として、次の関数があるとしましょう

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)

val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers  = streamOfNumbers.filter(isOdd).fold(0)(add)

zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )

scalaz-stream を使用すると、この例では期待どおりの結果が得られます - 1 から 10 までの数値のタプルがシンクに渡されます。

streamOfNumbersただし、 IO を必要とするものに置き換えると、実際には IO アクションが 2 回実行されます。

を使用するTopicと、ストリーム内の要素を正しく複製する pub/sub プロセスを作成できますが、バッファリングは行われません。シンクが消費するペースに関係なく、ソース全体をできるだけ速く消費するだけです。

これを制限された Queue でラップすることはできますが、最終結果は必要以上に複雑に感じられます。

ソースから IO アクションを複製せずに scalaz-stream でストリームを分割する簡単な方法はありますか?

4

3 に答える 3

6

また、「分割」要件で前の回答 delas を明確にするために。特定の問題の解決策は、ストリームを分割する必要がない場合があります。

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map {
   case even if even % 2 == 0 => right(even)
   case odd => left(odd)
} 
val summed = oddOrEven.pipeW(sump1).pipeO(sump1)

val evenSink: Sink[Task,Int] = ???
val oddSink: Sink[Task,Int] = ???

summed
.drainW(evenSink)
.to(oddSink)
于 2014-12-18T08:25:10.060 に答える
2

おそらくまだトピックを使用して、トピックにプッシュする前に子プロセスがサブスクライブすることを保証することができます。

ただし、このソリューションには制限がないことに注意してください。つまり、プッシュが速すぎると、OOM エラーが発生する可能性があります。

def split[A](source:Process[Task,A]): Process[Task,(Process[Task,A], Proces[Task,A])]] = {
  val topic = async.topic[A]

  val sub1 = topic.subscribe
  val sub2 = topic.subscribe

  merge.mergeN(Process(emit(sub1->sub2),(source to topic.publish).drain))
}
于 2014-12-18T08:16:13.880 に答える
0

同様に、この機能が必要でした。私の状況はかなりトリッキーで、この方法で回避することはできませんでした。

このスレッドでの Daniel Spiewak の応答のおかげで、次の作業を行うことができました。追加することで彼のソリューションを改善しonHalt、完了したらアプリケーションが終了するようにProcessしました。

def split[A](p: Process[Task, A], limit: Int = 10): Process[Task, (Process[Task, A], Process[Task, A])] = {
  val left = async.boundedQueue[A](limit)
  val right = async.boundedQueue[A](limit)

  val enqueue = p.observe(left.enqueue).observe(right.enqueue).drain.onHalt { cause =>
    Process.await(Task.gatherUnordered(Seq(left.close, right.close))){ _ => Halt(cause) }
  }
  val dequeue = Process((left.dequeue, right.dequeue))

  enqueue merge dequeue
}
于 2016-09-02T21:53:23.267 に答える