8

Scala 標準ライブラリを使用すると、次のようなことができます。

scala> val scalaList = List(1,2,3)
scalaList: List[Int] = List(1, 2, 3)

scala> scalaList.foldLeft(0)((acc,n)=>acc+n)
res0: Int = 6

多くの Int から 1 つの Int を作成します。

そして、私は次のようなことができます:

scala> scalaList.foldLeft("")((acc,n)=>acc+n.toString)
res1: String = 123

多くの Int から 1 つの String を作成します。

したがって、foldLeft は同種または異種のいずれかである可能性があり、必要に応じて 1 つの API に含まれます。

Spark で、多くの Int から 1 つの Int が必要な場合は、次のようにできます。

scala> val rdd = sc.parallelize(List(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12
scala> rdd.fold(0)((acc,n)=>acc+n)
res1: Int = 6

フォールド API は foldLeft に似ていますが、均一であるだけで、RDD[Int] はフォールドでのみ Int を生成できます。

spark にも集約 API があります。

scala> rdd.aggregate("")((acc,n)=>acc+n.toString, (s1,s2)=>s1+s2)
res11: String = 132

これは異種混合であり、RDD[Int] は String を生成できるようになりました。

では、なぜフォールドと集約が Spark で 2 つの異なる API として実装されているのでしょうか?

同種と異種の両方であるfoldLeftのように設計されていないのはなぜですか?

(私は Spark に非常に慣れていないため、これがばかげた質問である場合はご容赦ください。)

4

3 に答える 3

4

fold固定の評価順序に依存しないため、より効率的に実装できます。そのため、各クラスタ ノードはfold独自のチャンクを並行して作成foldし、最後に全体的に小さな 1 つのチャンクを作成できます。一方、foldLeft各要素は順番に折りたたむ必要があり、並行して行うことはできません。

(また、便宜上、一般的なケースのより単純な API があると便利です。標準の lib には、この理由reduceと同様にあります)foldLeft

于 2014-10-29T16:36:18.600 に答える
2

特にSparkでは計算が分散して並列に行われるためfoldLeft、標準ライブラリのままでは実装できません。fold代わりに、集計には 2 つの関数が必要です。1 つは type の各要素に対してと同様の操作を実行してtypeTの値を生成し、もう 1 つは各パーティションの を最終的な値にU結合します。U

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
于 2014-10-29T17:06:04.593 に答える
1

foldLeft, foldRight, reduceLeft, reduceRight, scanLeftおよびscanRightは、蓄積されたパラメーターが入力パラメーターと異なる可能性がある操作 ( (A, B) -> B) であり、これらの操作は順番にしか実行できません。

foldは、蓄積されたパラメータが入力パラメータと同じタイプでなければならない操作です ( (A, A) -> A)。その後、並列実行できます。

aggregationは、累積されたパラメーターが入力パラメーターとは異なる型になる可能性がある操作ですが、累積されたパラメーターを最終結果に集約する方法を定義する追加の関数を提供する必要があります。この操作により、並列実行が可能になります。操作はとのaggregation組み合わせです。foldLeftfold

詳細については、「並列プログラミング」コースのコーセラ ビデオをご覧ください。

于 2017-07-09T17:44:36.757 に答える