0

私は次の例を試していました

val lista = List(("a", 3), ("a", 1), ("b", 7), ("a", 5))
val rdd = sc.parallelize(lista)

次に、シェルで次のように取得します

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[40] at parallelize at <console>:30

しかし、何らかの理由で、私はまだこの文を実行できたことを理解していません

val resAgg = rdd.aggregateByKey(new HashSet[Int])(_+_, _++_)

これをシェルで取得する

resAgg: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.HashSet[Int])] = ShuffledRDD[41] at aggregateByKey at <console>:32

だから私はいくつかの質問があります:

1.- rdd という名前の var の実際の RDD タイプは何ですか? シェルでは org.apache.spark.rdd.RDD[(String, Int)] 型であることが示されていますが、API を見ると、RDD クラスには aggregateByKey メソッドがありません。ちなみにJavaPairRDDクラスにはaggregateByKeyメソッドがあります

2.- RDDの実際のタイプを確認/知るにはどうすればよいですか

3.- ParallelCollectionRDD が表示されたのは何ですか? 私はgithubでそれを探しましたが、プライベートクラスであることがわかったので、それがscala APIに表示されない理由だと思いますが、それは何のためですか?

Spark 1.6.2を使用していました

4

1 に答える 1

4

あなたが見ているのは、暗黙的な変換の効果です:

  • rdd タイプはありますかorg.apache.spark.rdd.RDD[(String, Int)]
  • 呼び出しを試みたときaggregateByKeyに、この型が存在しない場合、コンパイラは暗黙的な型への変換を探し、この変換をPairRDDFunctions次のように見つけます。

    implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
      new PairRDDFunctions(rdd)
    }
    
  • 次に、PairRDDFunctions.aggregateByKey呼び出されます。

最後の質問について:

その ParallelCollectionRDD とは

RDD は多くのサブクラスを持つ抽象クラスであり、これはその 1 つです。一般に、各サブクラスは、RDD で実行されるさまざまなアクション (読み取り/書き込み/シャッフル/チェックポイントなど) を担当します。この特定の型は、呼び出し時にSparkContext.parallelize使用されます。つまり、ドライバー プログラムからのコレクションを並列化するために使用されます。実際、これは非公開であり、実際に手元にある RDD のサブタイプを一般的に気にする必要はありません。

于 2016-07-20T07:25:07.503 に答える