5

scalaz iteratee パッケージを使用して、一定のスペースで大きな zip ファイルを処理しようとしています。zip ファイル内の各ファイルに対して実行する必要がある長時間実行プロセスがあります。これらのプロセスは並行して実行できます (また、並行して実行する必要があります)。

それぞれをオブジェクトにEnumeratorT膨らませるを作成しました。署名は次のようになります。ZipEntryFile

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]

IterateeT各ファイルに対して長時間実行されるプロセスを実行するを添付したいと思います。私は基本的に次のようなものになります:

type IOE[A] = IoExceptionOr[A]

def action(f:File):IO[List[Promise[IOE[File]]]] = (
  consume[Promise[IOE[File]], IO, List] %=
  map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
  Promise { Thread.sleep(5000); iof }

実行しようとすると:

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get

java.lang.OutOfMemoryError: Java heap spaceメッセージが届きます。IOこれらすべてとPromiseオブジェクトのメモリに大量のリストを構築しようとしているので、これは私には理にかなっています。

いくつかの質問:

  • これを回避する方法について誰かアイデアがありますか? longRunningProcess私は本当にその副作用だけを気にしているので、私は問題に間違ってアプローチしているように感じます.
  • ここでのEnumeratorアプローチは間違ったアプローチですか?

私はアイデアがほとんどないので、何でも役に立ちます。

ありがとう!

更新 #1

スタック トレースは次のとおりです。

[error] java.lang.OutOfMemoryError: Java heap space
[error]         at scalaz.Free.flatMap(Free.scala:46)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)

私は現在、nadavwr のアドバイスを受けて、すべてが私が思うように動作していることを確認しています。更新があればまた報告します。

アップデート #2

以下の両方の回答からのアイデアを使用して、適切な解決策を見つけました。huynhjl が示唆したように (そして、ヒープ ダンプを分析するという nadavwr の提案を使用して確認しました)、consumeすべての膨張ZipEntryしたものがメモリに保持されていたため、プロセスがメモリ不足になりました。ファイルへの参照ではなく を返すように、実行時間の長いプロセスに変更consumeして更新しました。そうすれば、最後にすべての IoExceptions のコレクションを取得できます。実用的なソリューションは次のとおりです。foldMPromise[IOE[Unit]]

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
  foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
  map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
  Promise { Thread.sleep(5000); iof.map(println) }

このソリューションは、各エントリを非同期にアップロードしながらインフレートします。Promise最後に、エラーを含む実行済みオブジェクトの膨大なリストがあります。これが Iteratee の正しい使用法であるとはまだ完全には確信していませんが、システムの他の部分で使用できる再利用可能で構成可能な部分がいくつかあります (これは私たちにとって非常に一般的なパターンです)。

ご助力いただきありがとうございます!

4

3 に答える 3

4

使用しないでくださいconsume。私の他の最近の回答を参照してください: How to use IO with Scalaz7 Iteratees without Overflowing the stack?

foldMより良い選択かもしれません。

また、ファイルを別のもの (成功の戻りコードなど) にマップして、JVM が膨張した zip エントリをガベージ コレクションできるかどうかを確認してください。

于 2013-04-26T14:57:06.630 に答える
1

どれくらい高価ですか (メモリに関してはlongRunningProcess? ファイルのデフレはどうですか? 期待した回数だけ実行されていますか? (単純なカウンターが役に立ちます)

スタック トレースは、ラクダの背中を折ったストローを特定するのに役立ちます。それが原因である場合もあります。

何が大量のメモリを占有しているのかを確認したい場合は、-XX:+HeapDumpOnOutOfMemoryErrorJVM 引数を使用してから、VisualVM、Eclipse MAT、またはその他のヒープ アナライザーで分析できます。

ファローアップ

あなたが約束を列挙しているのは私には奇妙に思えます。列挙子と iteratee の両方から独立して計算を開始するのは直観に反します。反復ベースのソリューションは、約束の代わりに「不活性」要素を返す列挙子によってより適切に提供される場合があります。残念ながら、それでは個々のファイルの処理がシリアルになりますが、それは ya の反復処理です -- ノンブロッキング ストリーム処理です。

アクターベースのソリューションの方が私見には適していますが、アクターと反復 (特に後者) の両方が、達成しようとしているもの (少なくとも共有している部分) に対して過剰に思えます。

Scala 2.10 の scala.concurrent パッケージの単純な先物/約束を検討してください。また、Scala の並列コレクションも必ず確認してください。これらが不十分であることが証明されるまで、追加の概念をコードに導入することはありません。並列処理を制限するために、固定サイズの ExecutionContext を定義してみてください。

于 2013-04-26T13:33:14.823 に答える