Accumulator on Spark が GC にならないという問題に遭遇しました。
def newIteration (lastParams: Accumulable[Params, (Int, Int, Int)], lastChosens: RDD[Document], i: Int): Params = {
if (i == maxIteration)
return lastParams.value
val size1: Int = 100
val size2: Int = 1000
// each iteration generates a new accumulator
val params = sc.accumulable(Params(size1, size2))
// there is map operation here
// if i only use lastParams, the result in not updated
// but params can solve this problem
val chosen = data.map {
case(Document(docID, content)) => {
lastParams += (docID, content, -1)
val newContent = lastParams.localValue.update(docID, content)
lastParams += (docID, newContent, 1)
params += (docID, newContent, 1)
Document(docID, newContent)
}
}.cache()
chosen.count()
lastChosens.unpersist()
return newIteration(params, chosen, i + 1)
}
問題は、メモリが制限されるまで、割り当てるメモリが常に増加していることです。lastParms
GCではないようです。ClassRDD
とBroadcast
methodunpersist()
がありますが、ドキュメントでこのようなメソッドを見つけることができません。
Accumulable
GC を自動的にできないのはなぜですか、またはより良い解決策がありますか?