2

私のアキュムレータは、RDD の foreach 操作でアキュムレータを更新した後の Array[Array[Int]] です。accumulator(0) は期待どおりですが、accumulator(1) は完全に失われた Array(0,0,0) です。

RDD 内では、アキュムレータ値は Array(Array(4,5,6),Array(4,5,6)) RDD 外では、アキュムレータ値は Array(Array(4,5,6),Array(0,0,0) です)))

以下はコードです

import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object acc {
  def main(args: Array[String]) {
     val conf = new SparkConf().setAppName("Simple Application")
  val sc = new SparkContext(conf)
  val a =Array(Array(1,2,3),Array(4,5,6))
  val rdd = sc.parallelize(a)
  val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
  val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
  rdd.foreach{x=>
     accumulator += (x(0),0,0)
     accumulator += (x(1),0,1)
     accumulator += (x(2),0,2)
     accumulator += (x(0),1,0)
     accumulator += (x(1),1,1)
     accumulator += (x(2),1,2)
     println("accumulator value in rdd is"+accumulator.localValue)
     }

  println("accumulator value out of rdd is :" + accumulator.value )

  }

}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int,   Int)] {

  def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
    initialValue
  }

  def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {

    acc(value._2)(value._3) = value._1
    acc

  }

   def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
    val columnLength: Int = m1.length
    val rowLength: Int = m1(0).length
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)

    var j: Int = 0
    while (j < columnLength) {
      var i =0
    while (i < rowLength) {
         val a = Math.max(m1(j)(i), m2(j)(i))
        updatedMatrix(j)(i) = a
        i += 1
      } 
      j += 1
    }

    updatedMatrix
      }


}

結果: RDD 内では、アキュムレータ値は Array(Array(4,5,6),Array(4,5,6)) です RDD 外では、アキュムレータ値は Array(Array(4,5,6),Array(0,0) です,0))

しかし、RDDの外で私が期待しているのは Array(Array(4,5,6),Array(4,5,6)) です

4

3 に答える 3

3

accumulator.variable が更新されるたびに、 addAccumulatorメソッドが呼び出されます。

上記のコード accumulator += (x(0),0,0) では、addAccumulator メソッドを呼び出します。

すべてのタスクが完了すると、 addInPlaceメソッドが呼び出され、すべてのタスクから累積された値が集計されます。

上記のコードでは、initialValue Array(1, 1, 1)Array(1, 1, 1) およびタスク Accumulator 値 Array(4, 5, 6) Array(4, 5, 6) が addInPlace メソッドを呼び出します。

上記のコードでは、 addInPlace メソッドの変数 i は、ループに入るたびにリセットする必要がありますwhile (j < columnLength) {

次のコードは魅力のように機能します。

            while (j < columnLength) {
              i=0
                while (i < rowLength) {
                  println("m1(j)(i)"+ m1(j)(i))
                  println(" m2(j)(i))"+ m2(j)(i))
                    val a = Math.max(m1(j)(i), m2(j)(i))
                            updatedMatrix(j)(i) = a
                            i += 1
                } 
                j += 1
            }
于 2014-12-09T13:52:27.730 に答える
0

var i=0 を i=0 に変更しても違いがないことがわかり、最終結果は Array(Array(4,5,6),Array(4,5,6)) です

アプリケーションの出力は、yarn logs -applicationId によってフェッチされます。

コードは次のとおりです。

import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object acc {
  def main(args: Array[String]) {
     //val conf = new SparkConf().setAppName("Simple Application")
  val conf = new SparkConf()
  conf.setSparkHome("/usr/lib/spark")
  conf.setAppName("Simple Application")
  val sc = new SparkContext(conf)
  val a =Array(Array(1,2,3),Array(4,5,6))
  val rdd = sc.parallelize(a)
  val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
  val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
  rdd.foreach{x=>
     accumulator += (x(0),0,0)
     accumulator += (x(1),0,1)
     accumulator += (x(2),0,2)
     accumulator += (x(0),1,0)
     accumulator += (x(1),1,1)
     accumulator += (x(2),1,2)
     val columnLength: Int = accumulator.localValue.length
     val rowLength: Int = accumulator.localValue(0).length
     var j: Int = 0
     var i: Int = 0
     println("accumulator")
     while(j < columnLength){
        i =0 
        while(i<rowLength){
            println(accumulator.localValue(j)(i))
            i += 1
        }
        j+=1
     }
     println("accumulator value in rdd is"+accumulator.localValue)
     }
     val columnLength: Int = accumulator.value.length
     val rowLength: Int = accumulator.value(0).length
     var j: Int = 0
     var i: Int = 0
     println("total")
     while(j < columnLength){
        i =0 
        while(i<rowLength){
            println(accumulator.value(j)(i))
            i += 1
        }
        j+=1
     }

  println("accumulator value out of rdd is :" + accumulator.value )

  }

}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int,   Int)] {

  def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
    initialValue
  }

  def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {

    acc(value._2)(value._3) = value._1
    acc
  }

   def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
    val columnLength: Int = m1.length
    val rowLength: Int = m1(0).length
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)

    var j: Int = 0
    var i: Int = 0
    while (j < columnLength) {
    i =0
    while (i < rowLength) {
        println("m1("+j+")("+i+")="+ m1(j)(i) + " m2("+j+")("+i+")="+ m2(j)(i))
        val a = Math.max(m1(j)(i), m2(j)(i))
        updatedMatrix(j)(i) = a
        i += 1
      } 
      j += 1
    }

    updatedMatrix
  }
}

結果は次のとおりです。

accumulator
4
5
6
4
5
6

accumulator
1
2
3
1
2
3

m1(0)(0)=1 m2(0)(0)=1
m1(0)(1)=1 m2(0)(1)=2
m1(0)(2)=1 m2(0)(2)=3
m1(1)(0)=1 m2(1)(0)=1
m1(1)(1)=1 m2(1)(1)=2
m1(1)(2)=1 m2(1)(2)=3
m1(0)(0)=1 m2(0)(0)=4
m1(0)(1)=2 m2(0)(1)=5
m1(0)(2)=3 m2(0)(2)=6
m1(1)(0)=1 m2(1)(0)=4
m1(1)(1)=2 m2(1)(1)=5
m1(1)(2)=3 m2(1)(2)=6

total
4
5
6
4
5
6

コードを次のように変更します。

    //var i: Int = 0
    while (j < columnLength) {
    var i =0

結果は次のとおりです。

m1(0)(0)=1 m2(0)(0)=1
m1(0)(1)=1 m2(0)(1)=2
m1(0)(2)=1 m2(0)(2)=3
m1(1)(0)=1 m2(1)(0)=1
m1(1)(1)=1 m2(1)(1)=2
m1(1)(2)=1 m2(1)(2)=3
m1(0)(0)=1 m2(0)(0)=4
m1(0)(1)=2 m2(0)(1)=5
m1(0)(2)=3 m2(0)(2)=6
m1(1)(0)=1 m2(1)(0)=4
m1(1)(1)=2 m2(1)(1)=5
m1(1)(2)=3 m2(1)(2)=6
total
4
5
6
4
5
6

accumulator
1
2
3
1
2
3

accumulator
4
5
6
4
5
6

最終結果は同じです。

しかし、2 つの質問があります。

  • 2 つの出力順序が同じでない理由はわかりません。
  • addInplace 関数が 2 回呼び出されるのはなぜですか?
    • この関数が 2 回呼び出される理由はわかっていると思いますが、よくわかりません
      • 初期化: Array(Array(1,1,1),Array(1,1,1)
      • タスクからの出力: Array(Array(1,2,3),Array(1,2,3)
      • 他のタスクからの出力: Array(Array(4,5,6),Array(4,5,6)

@ビジェイ・イナムリ

于 2015-11-10T08:07:13.743 に答える
0

ドキュメントによると、 localValue は異なるはずです。

  • これはアキュムレータのグローバル値ではありません。グローバル値を取得するには
  • データセットに対する操作が完了しました。 を呼び出しますvalue。*
  • このメソッドの典型的な使用法は、ローカル値を直接変更することです。
  • 要素をセットに。*/
于 2014-12-19T10:37:19.427 に答える