1

Spark 0.7.2 と Scala 2.9.3 でプログラミングを始めたばかりです。スタンドアロン マシンで機械学習アルゴリズムをテストしていますが、アルゴリズムの最後のステップでは、2 つのマトリックス間の MSE (平均二乗誤差) を計算する必要があります。A - M||^2 で、2 つの行列の間で要素ごとの減算を行います。A の潜在的なサイズは非常に大きくてまばらであるため、キーは座標 (i,j) であり、値は A の対応する要素とM、すなわち (A_ij, M_ij)。ML アルゴリズム全体が勾配降下であるため、反復ごとに MSE を計算し、特定のしきい値に対してテストします。ただし、プログラム全体は、反復ごとに MSE を計算せずに正常に実行されます。プログラムは次のようになります。

val ITERATIONS = 100
for (i <- 1 to ITERATIONS) {
  ... // calculate M for each iteration
  val mse = A.map{ x => 
    val A_ij = x._2(0) 
    val M_ij = x._2(1)
    (A_ij - M_ij) * (A_ij - M_ij)
  }.reduce(_+_)
  ...
}

このプログラムは最大 45 回の繰り返ししか実行できず、次の Spark Exception でクラッシュします。

[error] (run-main) spark.SparkException: Job failed: ShuffleMapTask(764, 0) failed: ExceptionFailure(java.lang.StackOverflowError)
spark.SparkException: Job failed: ShuffleMapTask(764, 0) failed: ExceptionFailure(java.lang.StackOverflowError)
    at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642)
    at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640)
    at spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:601)
    at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:300)
    at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364)
    at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)
java.lang.RuntimeException: Nonzero exit code: 1
    at scala.sys.package$.error(package.scala:27)

もう 1 つの観察結果は、反復ごとにランタイムが約 5% 増加することです。また、「reduce( _ + _ )」がないと、StackOverflowError は発生しません。可能な物理スレッドの総数まで並列処理を増やそうとしましたが、それは役に立ちません。

スタック オーバーフロー エラーの根本的な原因を特定できる方向性を誰かが指摘してくれたことを本当に感謝しています。

編集

  1. A の型は spark.RDD[((Double, Double), Array[Double])] です
  2. stackoverflow 例外で、" at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)" から 61 回繰り返されます。

    13/06/26 00:44:41 ERROR LocalScheduler: Exception in task 0
    java.lang.StackOverflowError
        at java.lang.Exception.<init>(Exception.java:77)
        at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:54)
        at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1849)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1753)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1947)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1753)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1947)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1753)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:351)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:435)
        at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
    
  3. 主な反復コード

次のリスト要素にはいくつかのユーティリティ関数が含まれています

while (i <= ITERATION && err >= THRESHOLD) {      
  // AW: group by row, then create key by col
  // split A by row
  // (col, (A_w_M_element, W_row_vector, (row, col)))
  AW = A.map(x =>
    (x._1._1, (x._1, x._2))
  ).cogroup(W).flatMap( x => {
    val wt_i = x._2._2(0)
    val A_i_by_j = x._2._1
    A_i_by_j.map( j => (j._1._2, (j._2, wt_i, j._1)))
  })

  // calculate the X = Wt*A
  X_i_by_j = AW.map( k => 
    (k._1, k._2._2.map(_*k._2._1(0)))
  ).reduceByKey(op_two_arrays(_, _, add))

  // Y = Wt*M = Wt*WH at the same time  
  Y_i_by_j = AW.map( k => 
    (k._1, k._2._2.map(_*k._2._1(2)))
  ).reduceByKey(op_two_arrays(_, _, add))

  // X ./ Y
  X_divide_Y = X_i_by_j.join(Y_i_by_j).map(x => 
    (x._1, op_two_arrays(x._2._1, x._2._2, divide))
  )

  // H = H .* X_divide_Y
  H = H.join(X_divide_Y).map(x => 
    (x._1, op_two_arrays(x._2._1, x._2._2, multiple))
  )

  // Update M = WH
  // M = matrix_multi_local(AW, H)
  A = AW.join(H).map( x => {
    val orig_AwM = x._2._1._1
    val W_row = x._2._1._2
    val cord = x._2._1._3
    val H_col = x._2._2
    // notice that we include original A here as well
    (cord, Array(orig_AwM(0), orig_AwM(1), dot_product_local(W_row, H_col)))
  })

  // split M into two intermediate matrix (one by row, and the other by col)

  /*val M_by_i = M.map(x =>
    (x._1._1, (x._1, x._2))
  )
  val M_by_j = M.map(x =>
    (x._1._2, (x._1, x._2))
  )*/

  // AH: group by col, then create key by row
  // Divide A by row first
  // val AH = matrix_join_local(M_by_j, H)
  AH = A.map(x =>
    (x._1._2, (x._1, x._2))
  ).cogroup(H).flatMap( x => {
    val H_col = x._2._2(0)
    val AM_j_by_i = x._2._1
    AM_j_by_i.map( i => (i._1._1, (i._2, H_col, i._1)))
  })

  // calculate V = At*H
  V_j_by_i = AH.map( k => 
    (k._1, k._2._2.map(_*k._2._1(0)))
  ).reduceByKey(op_two_arrays(_, _, add))

  // calculate U = Mt*H
  U_j_by_i = AH.map( k => 
    (k._1, k._2._2.map(_*k._2._1(2)))
  ).reduceByKey(op_two_arrays(_, _, add))

  // V / U
  V_divide_U = V_j_by_i.join(U_j_by_i).map(x => 
    (x._1, op_two_arrays(x._2._1, x._2._2, divide))
  )

  // W = W .* V_divide_U
  W = W.join(V_divide_U).map(x => 
    (x._1, op_two_arrays(x._2._1, x._2._2, multiple))
  )
  // M = W*H
  A = AH.join(W).map( x => {
    val orig_AwM = x._2._1._1
    val H_col = x._2._1._2
    val cord = x._2._1._3
    val W_row = x._2._2
    // notice that we include original A here as well
    (cord, Array(orig_AwM(0), orig_AwM(1), dot_product_local(W_row, H_col)))
  })  

  // Calculate the error
  // calculate the sequre of difference
  err = A.map( x => (x._2(0) - x._2(2))*(x._2(0) - x._2(2))/A_len).reduce(_+_)
  println("At round " + i + ": MSE is " + err)
}

以下を使用するいくつかのユーティリティ関数:

def op_two_arrays (array1: Array[Double], array2: Array[Double], f: (Double, Double) => Double) : Array[Double] = {
  val len1 = array1.length
  val len2 = array2.length
  if (len1 != len2) {
    return null
  }
  // val new_array : Array[Double] = new Array[Double](len1)
  for (i <- 0 to len1 - 1) {
    array1(i) = f(array1(i), array2(i))
  }
  return array1
}

// element-wise operation
def add (a: Double, b: Double): Double = { a + b }

def multiple (a: Double, b: Double): Double = { a * b }

def divide (a: Double, b: Double): Double = {
  try {
    return a / b
  } catch {
    case x: ArithmeticException => {
      println("ArithmeticException: detect divide by zero")
      return Double.NaN
    }
  }
}

def array_sum (array: Array[Double]) : Double = {
  var sum: Double = 0.0
  for (i <- array) {
    sum += i
  }
  return sum
}

def dot_product (vec1: Array[Double], vec2: Array[Double]) : Double = {
  array_sum(op_two_arrays(vec1, vec2, multiple))
}
4

1 に答える 1