2

2 つの部分からなる質問があるので、まず背景を説明させてください。私は、次のように私が望むものと同様のことを行うことが可能であることを知っています:

import scalaz.concurrent._
import scalaz.stream._

val q = async.unboundedQueue[Int]
val p: Process[Task, Int] = q.dequeue

q.enqueueAll(1 to 2).run

val p1: Process1[Int, Int] = process1.take(1)

p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 1

p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 2

p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// hangs awaiting next input

p1ハングせずに以下の出力が得られるように使用できる他のものはありますか(のようになりますprocess1.awaitOption)?

Answer: Some(1)
Answer: Some(2)
Answer: None

はいの場合、次の質問に答えるのは簡単だと思います。p1ハングせずに以下の出力が得られるように使用できる他のものはありますか(のようになりますprocess1.chunkAll)?

Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()

編集:

質問を補足して、より理解しやすくします。次のようなループがある場合:

for (i <- 1 to 4) {
  p.pipe(p1).map(x => println(s"Answer: $x")).run.run
}

結果は次のようになります。

Answer: Seq()
// if someone pushes some values into the queue, like: q.enqueueAll(1 to 2).run
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()

私がやろうとしていることは今、はっきりしていると思います。問題は、ループを制御できないことです。キューに値がない場合はループをブロックしてはなりません。

4

2 に答える 2

1

あなたがしようとしているセマンティクスを理解しているかどうかはわかりませんが、一般的に、キューを外部で閉じるか、Y を使用することによって、プロセスが中断される (つまり、何らかの値を待つためにキャンセルされる) 可能性があります。割り込み。

次のエンキューされた値を待つ代わりに、プロセスを終了させたい場合は? これを空のキューに入れたいとしましょう。「サイズ」プロセスがあり、サイズが空の場合、それを使用して待機中のキューを中断できます。次のようになります。

val empty : Process[Task,Boolean] = q.size.continuous.map(_ <= 0)

val deq : Process[Task,Int] = empty.wye(q.enqueue)(wye.interrupt)
于 2015-04-20T04:19:43.343 に答える