1

私はScalaで大きなデータを処理しているので、メモリと時間は通常よりもさらに重要なコンパニオンです。いくつかのサブ評価を並行して行い、結果をマージするために、大きなソースファイルでIterator[String]取得したイニシャルを細分化することで、いくつかの評価の速度を上げようとしています。getLinesこれを行うにはslice、イテレータを2つに再帰的に分割し、各サブイテレータの再帰関数を呼び出します。ここで、なぜGCoverheadまたはJavaHeapSpace例外が発生するのか疑問に思っていますが、「重要な」要素は(イテレータのサイズを取得するために)再帰ステップの前に1回だけ評価されますが、私の意見では、再帰ステップでは評価されません。sliceイテレータを再度返します(これは実装によって厳密ではありません)。次の(削減された!)コードは、サブリストを連結する前に、〜15gのファイルに適用できません。

.duplicate各ステップで使用します。APIを調べたところ、.duplicate「実装は、一方のイテレータによって繰り返された要素に一時ストレージを割り当てる可能性がありますが、もう一方のイテレータによってまだ割り当てられていない可能性があります。」と書かれていますが、要素はまだ繰り返されていません。誰かが私にそこで何が間違っているのか、そしてこの問題を解決する方法のヒントを教えてもらえますか?どうもありがとう!

type itType = Iterator[String]
def src = io.Source.fromFile(args(0)).getLines

// recursively divide into equal size blocks in divide&conquer fashion
def getSubItsDC(it: itType, depth: Int = 4) = {
    println("Getting length of file..")
    val totalSize = src.length
    println(totalSize)
    def rec(it_rec: itType = it, depth_rec: Int = depth, size: Int = totalSize): 
        List[itType] = depth_rec match {
            case n if n > 0 => 
                println(n)
                val (it1, it2) = it_rec.duplicate
                val newSize = size/2
                rec(it1 slice (0,newSize), n-1, newSize) ++ 
                    rec(it2 slice (newSize,size), n-1, newSize)
            case n if n == 0 => List(it_rec)
    }
    println("Starting recursion..")
    rec()
}
getSubItsDC(src)

REPLでは、コードは任意のサイズのイテレーターで同じように高速に実行されるため(totalSizeをハードコーディングする場合)、正しい怠惰であると想定しました。

4

1 に答える 1

2

を使用して(a )itr grouped sizeを取得する方がよいと思います。Iterator[Iterator[String]]GroupedIterator

scala> val itr = (1 to 100000000).iterator grouped 1000000
itr: Iterator[Int]#GroupedIterator[Int] = non-empty iterator

これにより、ファイルの一部の処理をチャンクできます。

ソリューションが大量のメモリを使用する理由

を複製することIterator明らかに操作です。これは、 Iterator が計算された値をキャッシュする必要がある場合があることを意味します。例えば:

scala> val itr = (1 to 100000000).iterator
itr: Iterator[Int] = non-empty iterator

scala> itr filter (_ % 10000000 == 0) foreach println
10000000
....
100000000

しかし、私が複製を取るとき:

scala> val (a, b) = (1 to 100000000).iterator.duplicate
a: Iterator[Int] = non-empty iterator
b: Iterator[Int] = non-empty iterator

scala> a filter (_ % 10000000 == 0) foreach println

//oh dear, garbage collecting
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

この例では、 を実行すると、重複aするために、反復されたが反復されていないb要素をキャッシュする必要がありますab

于 2012-06-12T15:29:42.750 に答える