2

私は Apache Spark の初心者です。私は現在、機械学習プログラムに取り組んでいます。これには、RDD を繰り返し更新し、約 10KB のデータをエグゼキューターからドライバーに収集する必要があります。残念ながら、600 回以上の反復を実行すると、StackOverFlow エラーが発生します。以下は私のコードです。イテレーション数が400を超えると、collectAsMap関数でstackoverflowエラーが発生しました! ここで、indexedDevF と indexedData は indexedRDD です ( https://github.com/amplab/spark-indexedrddで提供されるライブラリとして AMPLab によって開発されました) 。

breakable{
  while(bLow > bHigh + 2*tolerance){
    indexedDevF = indexedDevF.innerJoin(indexedData){(id, a, b) => (b, a)}.mapValues( x => ( x._2 + alphaHighDiff * broad_y.value(iHigh) * kernel(x._1, dataiHigh) + alphaLowDiff * broad_y.value(iLow) * kernel(x._1, dataiLow) ) )
    if (iteration % 50 == 0 ) {
          indexedDevF.checkpoint()
    }
    indexedDevF.persist()  // essential to get correct answer

    val devFMap = indexedDevF.collectAsMap() //0.5s every time according to local:4040! here will stackoverflow

    var min_value = Double.PositiveInfinity
    var max_value = -min_value
    var min_i = -1
    var max_i = -1

    i = 0
    while( i < m ){

      if(((y(i) > 0) && (alpha(i) < cEpsilon)) || ((y(i) < 0) && (alpha(i) > epsilon))){
          if( devFMap(i) <= min_value){
              min_value = devFMap(i)
              min_i = i
          }
      }

      if(((y(i) > 0) && (alpha(i) > epsilon)) || ((y(i) < 0) && (alpha(i) < cEpsilon))){
          if( devFMap(i) >= max_value ){
              max_value = devFMap(i)
              max_i = i
          }
      }
      i = i+1
    }

    iHigh = min_i
    iLow = max_i
    bHigh = devFMap(iHigh)
    bLow = devFMap(iLow) 

    dataiHigh = indexedData.get(iHigh.toLong).get
    dataiLow = indexedData.get(iLow.toLong).get 

    eta = 2 - 2 * kernel(dataiHigh, dataiLow)

    alphaHighOld = alpha(iHigh)
    alphaLowOld = alpha(iLow)
    var alphaDiff = alphaLowOld - alphaHighOld
    var lowLabel = y(iLow)
    var sign = y(iHigh) * lowLabel

    var alphaLowLowerBound = 0D
    var alphaLowUpperBound = 0D

    if (sign < 0){
        if (alphaDiff < 0){
            alphaLowLowerBound = 0;
            alphaLowUpperBound = cost + alphaDiff;
        }
        else{
            alphaLowLowerBound = alphaDiff;
            alphaLowUpperBound = cost;
        }
    }
    else{
        var alphaSum = alphaLowOld + alphaHighOld;
        if (alphaSum < cost){
            alphaLowUpperBound = alphaSum;
            alphaLowLowerBound = 0;
        }
        else{
            alphaLowLowerBound = alphaSum - cost;
            alphaLowUpperBound = cost;
        }
    }

    if (eta > 0){
        alphaLowNew = alphaLowOld + lowLabel*(bHigh - bLow)/eta;
        if (alphaLowNew < alphaLowLowerBound)
            alphaLowNew = alphaLowLowerBound;
        else if (alphaLowNew > alphaLowUpperBound) 
            alphaLowNew = alphaLowUpperBound;
    }
    else{
        var slope = lowLabel * (bHigh - bLow);
        var delta = slope * (alphaLowUpperBound - alphaLowLowerBound);
        if (delta > 0){
            if (slope > 0)  
                alphaLowNew = alphaLowUpperBound;
            else
                alphaLowNew = alphaLowLowerBound;
        }
        else
            alphaLowNew = alphaLowOld;
    }

    alphaLowDiff = alphaLowNew - alphaLowOld;
    alphaHighDiff = -sign*(alphaLowDiff);
    alpha(iLow) = alphaLowNew;
    alpha(iHigh) = (alphaHighOld + alphaHighDiff);


    if(iteration % 50 == 0)
      print(".")

    iteration = iteration + 1;


}

===================

元の質問は次のとおりです。チェックポイントが役に立たないことがわかり、プログラムはstackoverflowエラーで終了します!! 問題を説明するためのテスト用の簡単なコードを作成します。幸いなことに、親切な人が問題を解決するのを手伝ってくれます。答えは以下にあります! ただし、チェックポイントが実際に機能しても、プログラムでスタックオーバーフローエラーが発生します:(

for(i <- 1 to 1000){
  a = a.map(x => x+1).persist
  var b = a.collect()
  if(i%100 == 0){
    a.checkpoint()
  }
  print(".")
}
4

1 に答える 1

1

RDD.checkpointドキュメントを見ると、次のように書かれています。

この関数は、このRDDでジョブが実行される前に呼び出す必要があります

実際、コードを少し変更して、収集する前aにチェックポイントを実行すると、 no で機能しStackOverflowErrorます。

for(i <- 1 to 1000){
  a = a.map(x => x+1).persist

  if(i%100 == 0){
    a.checkpoint()
  }

  var b = a.collect()

  print(".")
}
于 2016-03-28T08:46:56.500 に答える