scalaz iteratee パッケージを使用して、一定のスペースで大きな zip ファイルを処理しようとしています。zip ファイル内の各ファイルに対して実行する必要がある長時間実行プロセスがあります。これらのプロセスは並行して実行できます (また、並行して実行する必要があります)。
それぞれをオブジェクトにEnumeratorT
膨らませるを作成しました。署名は次のようになります。ZipEntry
File
def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]
IterateeT
各ファイルに対して長時間実行されるプロセスを実行するを添付したいと思います。私は基本的に次のようなものになります:
type IOE[A] = IoExceptionOr[A]
def action(f:File):IO[List[Promise[IOE[File]]]] = (
consume[Promise[IOE[File]], IO, List] %=
map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
Promise { Thread.sleep(5000); iof }
実行しようとすると:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get
java.lang.OutOfMemoryError: Java heap space
メッセージが届きます。IO
これらすべてとPromise
オブジェクトのメモリに大量のリストを構築しようとしているので、これは私には理にかなっています。
いくつかの質問:
- これを回避する方法について誰かアイデアがありますか?
longRunningProcess
私は本当にその副作用だけを気にしているので、私は問題に間違ってアプローチしているように感じます. - ここでの
Enumerator
アプローチは間違ったアプローチですか?
私はアイデアがほとんどないので、何でも役に立ちます。
ありがとう!
更新 #1
スタック トレースは次のとおりです。
[error] java.lang.OutOfMemoryError: Java heap space
[error] at scalaz.Free.flatMap(Free.scala:46)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
私は現在、nadavwr のアドバイスを受けて、すべてが私が思うように動作していることを確認しています。更新があればまた報告します。
アップデート #2
以下の両方の回答からのアイデアを使用して、適切な解決策を見つけました。huynhjl が示唆したように (そして、ヒープ ダンプを分析するという nadavwr の提案を使用して確認しました)、consume
すべての膨張ZipEntry
したものがメモリに保持されていたため、プロセスがメモリ不足になりました。ファイルへの参照ではなく を返すように、実行時間の長いプロセスに変更consume
して更新しました。そうすれば、最後にすべての IoExceptions のコレクションを取得できます。実用的なソリューションは次のとおりです。foldM
Promise[IOE[Unit]]
def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
Promise { Thread.sleep(5000); iof.map(println) }
このソリューションは、各エントリを非同期にアップロードしながらインフレートします。Promise
最後に、エラーを含む実行済みオブジェクトの膨大なリストがあります。これが Iteratee の正しい使用法であるとはまだ完全には確信していませんが、システムの他の部分で使用できる再利用可能で構成可能な部分がいくつかあります (これは私たちにとって非常に一般的なパターンです)。
ご助力いただきありがとうございます!