2

したがって、Spark はロジスティック回帰などの単一の RDD で反復アルゴリズムを実行できることを理解しています。

    val points = spark.textFile(...).map(parsePoint).cache()
    var w = Vector.random(D) // current separating plane
    for (i <- 1 to ITERATIONS) {
      val gradient = points.map(p =>
        (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
      ).reduce(_ + _)
      w -= gradient
    }

上記の例はw、各反復後に更新されるグローバル状態を維持し、その更新された値が次の反復で使用されるため、反復的です。この機能は Spark ストリーミングで可能ですか? 同じ例を考えてみましょう。ただし、今pointsは DStream です。この場合、勾配を計算する新しい DStream を作成できます。

val gradient = points.map(p =>
            (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
          ).reduce(_ + _)

しかし、グローバル状態をどのように処理しますかwwDStreamでもある必要があるように思えますが(おそらく使用updateStateByKey)、その最新の値をpointsmap関数に渡す必要がありますが、これは不可能だと思います。DStreams がこの方法で通信できるとは思いません。私は正しいですか、それとも Spark Streaming でこのような反復計算を行うことは可能ですか?

4

2 に答える 2

3

foreachRDD 関数を使用すると、これが非常に簡単であることがわかりました。MLlib は、DStreams でトレーニングできるモデルを実際に提供しています。私は、streamingLinearAlgorithmコードで答えを見つけました。グローバル更新変数をドライバーでローカルに保持し、.foreachRDD 内で更新できるように見えるので、実際には DStream 自体に変換する必要はありません。したがって、これを私が提供した例に適用できます

points.foreachRDD{(rdd,time) =>

     val gradient=rdd.map(p=> (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
     )).reduce(_ + _)

  w -= gradient

  }
于 2015-03-18T15:17:47.587 に答える