49

参加したいRDDが2つあり、次のようになります。

val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]

のキー値rdd1が一意であり、 のタプルキー値も一意である場合がありrdd2ます。次のrddが得られるように、2つのデータセットを結合したいと思います:

val rdd_joined:RDD[((T,W), (U,V))]

これを達成するための最も効率的な方法は何ですか? ここに私が考えたいくつかのアイデアがあります。

オプション1:

val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})

オプション 2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)

オプション 1 では、マスタリングするすべてのデータが収集されますよね? そのため、rdd1 が大きい場合、これは適切なオプションとは思えません (私の場合は比較的大きいですが、rdd2 よりも 1 桁小さくなっています)。オプション 2 は醜く明確なデカルト積を行いますが、これも非常に非効率的です。私の頭をよぎった (まだ試していない) 別の可能性は、オプション 1 を実行してマップをブロードキャストすることですが、マップのキーがのキーrdd2

誰もこのような状況に遭遇したことがありますか? お考えいただければ幸いです。

ありがとう!

4

2 に答える 2

14

別の方法として、カスタム パーティショナーを作成し、zipPartitions を使用して RDD に参加する方法があります。

import org.apache.spark.HashPartitioner

class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {

  override def getPartition(key: Any): Int = key match {
    case k: Tuple2[Int, String] => super.getPartition(k._1)
    case _ => super.getPartition(key)
  }

}

val numSplits = 8
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))

val result = rdd2.zipPartitions(rdd1)(
  (iter2, iter1) => {
    val m = iter1.toMap
    for {
        ((t: Int, w), u) <- iter2
        if m.contains(t)
      } yield ((t, w), (u, m.get(t).get))
  }
).partitionBy(new HashPartitioner(numSplits))

result.glom.collect
于 2014-04-18T00:25:56.190 に答える