2 つの部分からなる質問があるので、まず背景を説明させてください。私は、次のように私が望むものと同様のことを行うことが可能であることを知っています:
import scalaz.concurrent._
import scalaz.stream._
val q = async.unboundedQueue[Int]
val p: Process[Task, Int] = q.dequeue
q.enqueueAll(1 to 2).run
val p1: Process1[Int, Int] = process1.take(1)
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 1
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 2
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// hangs awaiting next input
p1
ハングせずに以下の出力が得られるように使用できる他のものはありますか(のようになりますprocess1.awaitOption
)?
Answer: Some(1)
Answer: Some(2)
Answer: None
はいの場合、次の質問に答えるのは簡単だと思います。p1
ハングせずに以下の出力が得られるように使用できる他のものはありますか(のようになりますprocess1.chunkAll
)?
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()
編集:
質問を補足して、より理解しやすくします。次のようなループがある場合:
for (i <- 1 to 4) {
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
}
結果は次のようになります。
Answer: Seq()
// if someone pushes some values into the queue, like: q.enqueueAll(1 to 2).run
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()
私がやろうとしていることは今、はっきりしていると思います。問題は、ループを制御できないことです。キューに値がない場合はループをブロックしてはなりません。