0

このような問題を解決するために、scalaz-streams を使用できる (すべきか?) という予感があります。

開始項目 A があります。A を受け取り、A のリストを返す関数があります。

def doSomething(a : A) : List[A]

1 つのアイテム (開始アイテム) で始まるワーク キューがあります。各アイテムを処理 ( doSomething) すると、同じワーク キューの最後に多くのアイテムが追加される場合があります。ただし、ある時点 (何百万ものアイテムの後) に、後続の各アイテムdoSomethingが作業キューに追加するアイテムの数が減り始め、最終的には新しいアイテムが追加されなくなります (doSomething はこれらのアイテムに対して Nil を返します)。これにより、計算が最終的に終了することがわかります。

scalaz-streams がこれに適していると仮定すると、これを実装するためにどの全体的な構造または型を調べる必要があるかについてのヒントを教えてください。

単一の「ワーカー」を使用した単純な実装が完了したら、複数のワーカーを使用してキュー アイテムを並行して処理したいと考えていますdoSomething。そのため、このアルゴリズムでも効果 (ワーカーの失敗など) を処理する必要があります。

4

1 に答える 1

2

では、「どのように?」に対する答えです。は :

import scalaz.stream._
import scalaz.stream.async._
import Process._

def doSomething(i: Int) = if (i == 0) Nil else List(i - 1)

val q = unboundedQueue[Int]
val out = unboundedQueue[Int]

q.dequeue
 .flatMap(e => emitAll(doSomething(e)))
 .observe(out.enqueue)
 .to(q.enqueue).run.runAsync(_ => ()) //runAsync can process failures, there is `.onFailure` as well

q.enqueueAll(List(3,5,7)).run
q.size.continuous
 .filter(0==)
 .map(_ => -1)
 .to(out.enqueue).once.run.runAsync(_ => ()) //call it only after enqueueAll

import scalaz._, Scalaz._
val result = out
  .dequeue
  .takeWhile(_ != -1)
  .map(_.point[List])
  .foldMonoid.runLast.run.get //run synchronously

結果:

result: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)

ただし、次のことに気付くかもしれません。

1) 終了の問題を解決しなければならなかった。akka-stream についても同じ問題があり、キューにアクセスできず、高速リーダーのためにキューが空にならないことを保証する自然な背圧がないため、そこで解決するのははるかに困難です。

List2)計算の最後に作業中のキューが空になるため、出力用に別のキューを導入する(そしてそれをに変換する)必要がありました。

したがって、どちらのライブラリもそのような要件 (有限ストリーム) にはあまり適合していませんが、scalaz-stream (scalaz の依存関係を取り除いた後に "fs2" になる予定です) は、アイデアを実装するのに十分柔軟です。それについての大きな「しかし」は、デフォルトで順次実行されることです。高速化するには、(少なくとも) 2 つの方法があります。

1) doSomething をステージに分割し、.flatMap(doSomething1).flatMap(doSomething2).map(doSomething3)その間に別のキューを配置します (ステージにかかる時間が等しい場合、約 3 倍速くなります)。

2) キュー処理を並列化します。Akka にはmapAsyncそのための機能がありmapます。自動的に s を並列処理できます。Scalaz-stream にはチャンクがあります - q を例えば 5 のチャンクにグループ化し、チャンク内の各要素を並行して処理できます。とにかく、両方のソリューション (akka と scalaz) は、入力と出力の両方として 1 つのキューを使用するのにはあまり適していません。

しかし、繰り返しになりますが、古典的な単純な方法があるため、それはあまりにも複雑で無意味です。

@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] = 
  if (l.isEmpty) acc else { 
    val processed = l.flatMap(doSomething) 
    calculate(processed, acc ++ processed) 
  }

scala> calculate(List(3,5,7), Nil)
res5: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)

そして、これが並列化されたものです:

@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] = 
  if (l.isEmpty) acc else { 
    val processed = l.par.flatMap(doSomething).toList
    calculate(processed, acc ++ processed) 
  }

scala> calculate(List(3,5,7), Nil)
res6: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)

ですから、scalaz-stream も akka-streams もあなたの要件には合わないと思います。ただし、従来の scala 並列コレクションは完全に適合します。

複数の JVM にまたがる分散計算が必要な場合は、Apache Spark を見てください。その scala-dsl は同じ map/flatMap/fold スタイルを使用しています。これにより、JVM のメモリに収まらない大きなコレクションを (JVM 間でスケーリングすること@tailrec def calculateにより) 操作できるようになるため、 List. また、内部で障害を処理する手段も提供しますdoSomething

PS では、このようなタスクにストリーミング ライブラリを使用するという考えが気に入らない理由を次に示します。ストリーミングは、事前定義された (大きな) データの計算ではなく、いくつかの外部システム (HttpRequests など) からの無限のストリームに似ています。

PS2 リアクティブのような (ブロッキングなし) が必要な場合は、Future(またはscalaz.concurrent.Task) +を使用できますFuture.sequence

于 2015-12-29T05:52:26.890 に答える