1

があり、各ストリームで実行時間が瞬時から非常に長いProcess[Task, A]関数を実行して、 を生成する必要があります。A => BAProcess[Task, B]

問題は、 s が受信された順序に関係なく、それぞれAを an でできるだけ早く処理し、結果が得られたらすぐに渡したいということです。ExecutionContextA

具体的な例は次のコードです。ここでは、すべての奇数がすぐに出力され、偶数が約 500 ミリ秒後に出力されることを願っています。代わりに発生するのは、(奇数、偶数) カップルが出力され、500 ミリ秒の一時停止でインターリーブされることです。

import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext

import scalaz.stream._
import scalaz.concurrent.Task

object Test extends App {
  val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))

  Process.range(0, 100).flatMap { i =>
    Process.eval(Task.apply {
      if(i % 2 == 0) Thread.sleep(500)
      i
    }(executor))
  }.to(io.printStreamSink(System.out)(_ println _))
  .run.run

  executor.shutdown()
  executor.awaitTermination(10, TimeUnit.MINUTES)
}
4

1 に答える 1

1

答えはチャネルを使用することです。これは、私が望むことを正確に行うように見える更新されたコードです。

import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext

import scalaz.stream._
import scalaz.concurrent.Task

object Test extends App {
  val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
  val chan = channel.lift[Task, Int, Int] { i => Task {
    if(i % 2 == 0) Thread.sleep(500)
    i
  }}

  merge.mergeN(8)(Process.range(0, 100).zipWith(chan)((i, f) => Process.eval(f(i))))
    .to(io.printStreamSink(System.out)(_ println _)).run.run

  executor.shutdown()
  executor.awaitTermination(10, TimeUnit.MINUTES)
}
于 2015-08-11T10:22:25.530 に答える