0

数値のリストがあるとしましょう:

val list = List(4,12,3,6,9)

リスト内のすべての要素について、ローリングサムを見つける必要があります。最終的な出力は次のようになります。

List(4, 16, 19, 25, 34)

リストの 2 つの要素 (現在の要素と前の要素) を入力として取り、両方に基づいて計算できる変換はありますか? 何かのようなものmap(initial)((curr,prev) => curr+prev)

共有されたグローバル状態を維持せずにこれを達成したいと考えています。

編集:RDDで同じ種類の計算を実行できるようにしたいと思います。

4

3 に答える 3

1

以下のcumSumメソッドは、 、 、 など、暗黙的に利用可能な任意のに対して機能するはずですRDD[N]NNumeric[N]IntLongBigIntDouble

import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD

def cumSum[N : Numeric : ClassTag](rdd: RDD[N]): RDD[N] = {
  val num = implicitly[Numeric[N]]
  val nPartitions = rdd.partitions.length

  val partitionCumSums = rdd.mapPartitionsWithIndex((index, iter) => 
    if (index == nPartitions - 1) Iterator.empty
    else Iterator.single(iter.foldLeft(num.zero)(num.plus))
  ).collect
   .scanLeft(num.zero)(num.plus)

  rdd.mapPartitionsWithIndex((index, iter) => 
    if (iter.isEmpty) iter
    else {
      val start = num.plus(partitionCumSums(index), iter.next)
      iter.scanLeft(start)(num.plus)
    }
  )
}

この方法を、「ゼロ」を持つ任意の連想二項演算子 (つまり、任意のモノイド) に一般化するのはかなり簡単です。並列化の鍵となるのは、連想性です。この連想性がないと、通常、 のエントリを順番に実行することになりますRDD

于 2017-06-15T08:12:50.070 に答える