3

Accumulator on Spark が GC にならないという問題に遭遇しました。

def newIteration (lastParams: Accumulable[Params, (Int, Int, Int)], lastChosens: RDD[Document], i: Int): Params = {
    if (i == maxIteration)
        return lastParams.value

    val size1: Int = 100
    val size2: Int = 1000

    // each iteration generates a new accumulator
    val params = sc.accumulable(Params(size1, size2))

    // there is map operation here
    // if i only use lastParams, the result in not updated
    // but params can solve this problem 
    val chosen = data.map {
        case(Document(docID, content)) => {
            lastParams += (docID, content, -1)
            val newContent = lastParams.localValue.update(docID, content)
            lastParams += (docID, newContent, 1)
            params += (docID, newContent, 1)
            Document(docID, newContent)
        }
    }.cache()
    chosen.count()
    lastChosens.unpersist()
    return newIteration(params, chosen, i + 1)
}

問題は、メモリが制限されるまで、割り当てるメモリが常に増加していることです。lastParmsGCではないようです。ClassRDDBroadcastmethodunpersist()がありますが、ドキュメントでこのようなメソッドを見つけることができません。

AccumulableGC を自動的にできないのはなぜですか、またはより良い解決策がありますか?

4

1 に答える 1

3

更新 (2016 年 4 月 22 日): SPARK-3885 使用されなくなったアキュムレータを削除するメカニズムを提供する問題が解決されました。

参照されなくなったアキュムレータを自動的にガベージ コレクションするためのサポートを追加する作業が進行中です。この機能の進行状況の追跡 については、 SPARK-3885を参照してください。現在レビュー中のSpark PR #4021は、この機能のパッチです。これは Spark 1.3.0 に含まれることを期待しています。

于 2015-01-21T21:47:25.593 に答える