私は Scalaz 7 iteratee を使用して、一定のヒープ空間で大量の (つまり、制限のない) データ ストリームを処理してきました。
コードでは、次のようになります。
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Chunk, ErrorOr, List[Result]] =
Iteratee.fold[Chunk, ErrorOr, List[Result]](Nil) { (rs, c) =>
processChunk(c) :: rs
} &= data
ここで、一度にP個のデータ チャンクを処理しながら、並列処理を実行したいと考えています。それでもヒープ領域を制限する必要がありますが、データのPチャンクと計算の累積結果を格納するのに十分なヒープがあると想定するのは合理的です。
私はTask
クラスを認識しており、列挙子をマッピングしてタスクのストリームを作成することを考えています。
data map (c => Task.delay(processChunk(c)))
しかし、非決定性を管理する方法はまだわかりません。ストリームを消費している間、可能な限りPタスクが実行されていることを確認するにはどうすればよいですか?
初挑戦:
私が最初に試した解決策は、ストリームを折りたたんで、Future
各チャンクを処理する Scala を作成することでした。しかし、プログラムは GC オーバーヘッド エラーで爆発しました (おそらく、すべてのFuture
s を作成しようとしたときに、すべてのチャンクをメモリにプルしていたためです)。代わりに、反復対象は、既にP個のタスクが実行されているときに入力の消費を停止し、それらのタスクのいずれかが終了したときに再び再開する必要があります。
2 回目の試行:
私の次の試みは、ストリームをPサイズの部分にグループ化し、各部分を並行して処理し、結合してから次の部分に進むことでした。
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Chunk, ErrorOr, Vector[Result]] =
Iteratee.foldM[Vector[Chunk], ErrorOr, Vector[Result]](Nil) { (rs, cs) =>
tryIO(IO(rs ++ Await.result(
Future.traverse(cs) {
c => Future(processChunk(c))
},
Duration.Inf)))
} &= (data mapE Iteratee.group(P))
これは利用可能なプロセッサーを完全には活用しませんが (特に、それぞれの処理に必要な時間Chunk
は大きく異なる可能性があるため)、改善にはなります。ただし、enumerateegroup
ではメモリ リークが発生しているようです。ヒープの使用量が突然増加します。