私は Apache Spark の初心者です。私は現在、機械学習プログラムに取り組んでいます。これには、RDD を繰り返し更新し、約 10KB のデータをエグゼキューターからドライバーに収集する必要があります。残念ながら、600 回以上の反復を実行すると、StackOverFlow エラーが発生します。以下は私のコードです。イテレーション数が400を超えると、collectAsMap関数でstackoverflowエラーが発生しました! ここで、indexedDevF と indexedData は indexedRDD です ( https://github.com/amplab/spark-indexedrddで提供されるライブラリとして AMPLab によって開発されました) 。
breakable{
while(bLow > bHigh + 2*tolerance){
indexedDevF = indexedDevF.innerJoin(indexedData){(id, a, b) => (b, a)}.mapValues( x => ( x._2 + alphaHighDiff * broad_y.value(iHigh) * kernel(x._1, dataiHigh) + alphaLowDiff * broad_y.value(iLow) * kernel(x._1, dataiLow) ) )
if (iteration % 50 == 0 ) {
indexedDevF.checkpoint()
}
indexedDevF.persist() // essential to get correct answer
val devFMap = indexedDevF.collectAsMap() //0.5s every time according to local:4040! here will stackoverflow
var min_value = Double.PositiveInfinity
var max_value = -min_value
var min_i = -1
var max_i = -1
i = 0
while( i < m ){
if(((y(i) > 0) && (alpha(i) < cEpsilon)) || ((y(i) < 0) && (alpha(i) > epsilon))){
if( devFMap(i) <= min_value){
min_value = devFMap(i)
min_i = i
}
}
if(((y(i) > 0) && (alpha(i) > epsilon)) || ((y(i) < 0) && (alpha(i) < cEpsilon))){
if( devFMap(i) >= max_value ){
max_value = devFMap(i)
max_i = i
}
}
i = i+1
}
iHigh = min_i
iLow = max_i
bHigh = devFMap(iHigh)
bLow = devFMap(iLow)
dataiHigh = indexedData.get(iHigh.toLong).get
dataiLow = indexedData.get(iLow.toLong).get
eta = 2 - 2 * kernel(dataiHigh, dataiLow)
alphaHighOld = alpha(iHigh)
alphaLowOld = alpha(iLow)
var alphaDiff = alphaLowOld - alphaHighOld
var lowLabel = y(iLow)
var sign = y(iHigh) * lowLabel
var alphaLowLowerBound = 0D
var alphaLowUpperBound = 0D
if (sign < 0){
if (alphaDiff < 0){
alphaLowLowerBound = 0;
alphaLowUpperBound = cost + alphaDiff;
}
else{
alphaLowLowerBound = alphaDiff;
alphaLowUpperBound = cost;
}
}
else{
var alphaSum = alphaLowOld + alphaHighOld;
if (alphaSum < cost){
alphaLowUpperBound = alphaSum;
alphaLowLowerBound = 0;
}
else{
alphaLowLowerBound = alphaSum - cost;
alphaLowUpperBound = cost;
}
}
if (eta > 0){
alphaLowNew = alphaLowOld + lowLabel*(bHigh - bLow)/eta;
if (alphaLowNew < alphaLowLowerBound)
alphaLowNew = alphaLowLowerBound;
else if (alphaLowNew > alphaLowUpperBound)
alphaLowNew = alphaLowUpperBound;
}
else{
var slope = lowLabel * (bHigh - bLow);
var delta = slope * (alphaLowUpperBound - alphaLowLowerBound);
if (delta > 0){
if (slope > 0)
alphaLowNew = alphaLowUpperBound;
else
alphaLowNew = alphaLowLowerBound;
}
else
alphaLowNew = alphaLowOld;
}
alphaLowDiff = alphaLowNew - alphaLowOld;
alphaHighDiff = -sign*(alphaLowDiff);
alpha(iLow) = alphaLowNew;
alpha(iHigh) = (alphaHighOld + alphaHighDiff);
if(iteration % 50 == 0)
print(".")
iteration = iteration + 1;
}
===================
元の質問は次のとおりです。チェックポイントが役に立たないことがわかり、プログラムはstackoverflowエラーで終了します!! 問題を説明するためのテスト用の簡単なコードを作成します。幸いなことに、親切な人が問題を解決するのを手伝ってくれます。答えは以下にあります! ただし、チェックポイントが実際に機能しても、プログラムでスタックオーバーフローエラーが発生します:(
for(i <- 1 to 1000){
a = a.map(x => x+1).persist
var b = a.collect()
if(i%100 == 0){
a.checkpoint()
}
print(".")
}