では、「どのように?」に対する答えです。は :
import scalaz.stream._
import scalaz.stream.async._
import Process._
def doSomething(i: Int) = if (i == 0) Nil else List(i - 1)
val q = unboundedQueue[Int]
val out = unboundedQueue[Int]
q.dequeue
.flatMap(e => emitAll(doSomething(e)))
.observe(out.enqueue)
.to(q.enqueue).run.runAsync(_ => ()) //runAsync can process failures, there is `.onFailure` as well
q.enqueueAll(List(3,5,7)).run
q.size.continuous
.filter(0==)
.map(_ => -1)
.to(out.enqueue).once.run.runAsync(_ => ()) //call it only after enqueueAll
import scalaz._, Scalaz._
val result = out
.dequeue
.takeWhile(_ != -1)
.map(_.point[List])
.foldMonoid.runLast.run.get //run synchronously
結果:
result: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
ただし、次のことに気付くかもしれません。
1) 終了の問題を解決しなければならなかった。akka-stream についても同じ問題があり、キューにアクセスできず、高速リーダーのためにキューが空にならないことを保証する自然な背圧がないため、そこで解決するのははるかに困難です。
List
2)計算の最後に作業中のキューが空になるため、出力用に別のキューを導入する(そしてそれをに変換する)必要がありました。
したがって、どちらのライブラリもそのような要件 (有限ストリーム) にはあまり適合していませんが、scalaz-stream (scalaz の依存関係を取り除いた後に "fs2" になる予定です) は、アイデアを実装するのに十分柔軟です。それについての大きな「しかし」は、デフォルトで順次実行されることです。高速化するには、(少なくとも) 2 つの方法があります。
1) doSomething をステージに分割し、.flatMap(doSomething1).flatMap(doSomething2).map(doSomething3)
その間に別のキューを配置します (ステージにかかる時間が等しい場合、約 3 倍速くなります)。
2) キュー処理を並列化します。Akka にはmapAsync
そのための機能がありmap
ます。自動的に s を並列処理できます。Scalaz-stream にはチャンクがあります - q を例えば 5 のチャンクにグループ化し、チャンク内の各要素を並行して処理できます。とにかく、両方のソリューション (akka と scalaz) は、入力と出力の両方として 1 つのキューを使用するのにはあまり適していません。
しかし、繰り返しになりますが、古典的な単純な方法があるため、それはあまりにも複雑で無意味です。
@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] =
if (l.isEmpty) acc else {
val processed = l.flatMap(doSomething)
calculate(processed, acc ++ processed)
}
scala> calculate(List(3,5,7), Nil)
res5: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
そして、これが並列化されたものです:
@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] =
if (l.isEmpty) acc else {
val processed = l.par.flatMap(doSomething).toList
calculate(processed, acc ++ processed)
}
scala> calculate(List(3,5,7), Nil)
res6: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
ですから、scalaz-stream も akka-streams もあなたの要件には合わないと思います。ただし、従来の scala 並列コレクションは完全に適合します。
複数の JVM にまたがる分散計算が必要な場合は、Apache Spark を見てください。その scala-dsl は同じ map/flatMap/fold スタイルを使用しています。これにより、JVM のメモリに収まらない大きなコレクションを (JVM 間でスケーリングすること@tailrec def calculate
により) 操作できるようになるため、 List
. また、内部で障害を処理する手段も提供しますdoSomething
。
PS では、このようなタスクにストリーミング ライブラリを使用するという考えが気に入らない理由を次に示します。ストリーミングは、事前定義された (大きな) データの計算ではなく、いくつかの外部システム (HttpRequests など) からの無限のストリームに似ています。
PS2 リアクティブのような (ブロッキングなし) が必要な場合は、Future
(またはscalaz.concurrent.Task
) +を使用できますFuture.sequence