26

RDDs1つまたは複数の列で2つの通常を結合する必要があります。論理的には、この操作は 2 つのテーブルのデータベース結合操作と同等です。これだけで可能なのか、Spark SQLそれとも他の方法があるのか​​ 疑問に思います。

具体的な例として、r1主キーを持つ RDD を考えてみましょうITEM_ID:

(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID)

r2主キーを持つRDD COMPANY_ID

(COMPANY_ID, COMPANY_NAME, COMPANY_CITY)

参加したいr1r2.

これはどのように行うことができますか?

4

4 に答える 4

29

Soumya Simanta は良い答えを出しました。ただし、結合された RDD の値はIterableであるため、結果は通常のテーブル結合とあまり似ていない可能性があります。

または、次のこともできます。

val mappedItems = items.map(item => (item.companyId, item))
val mappedComp = companies.map(comp => (comp.companyId, comp))
mappedItems.join(mappedComp).take(10).foreach(println)

出力は次のようになります。

(c1,(Item(1,first,2,c1),Company(c1,company-1,city-1)))
(c1,(Item(2,second,2,c1),Company(c1,company-1,city-1)))
(c2,(Item(3,third,2,c2),Company(c2,company-2,city-2)))
于 2014-12-12T07:36:38.960 に答える
12

(Scala を使用) 2 つの RDD があるとします。

  • emp: (empid、ename、dept)

  • 部門: (dname、部門)

以下は別の方法です。

//val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))
val emp = sc.parallelize(Seq(("jordan",10), ("ricky",20), ("matt",30), ("mince",35), ("rhonda",30)))

val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))

//val shifted_fields_emp = emp.map(t => (t._3, t._1, t._2))
val shifted_fields_emp = emp.map(t => (t._2, t._1))

val shifted_fields_dept = dept.map(t => (t._2,t._1))

shifted_fields_emp.join(shifted_fields_dept)
// Create emp RDD
val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))

// Create dept RDD
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))

// Establishing that the third field is to be considered as the Key for the emp RDD
val manipulated_emp = emp.keyBy(t => t._3)

// Establishing that the second field need to be considered as the Key for dept RDD
val manipulated_dept = dept.keyBy(t => t._2)

// Inner Join
val join_data = manipulated_emp.join(manipulated_dept)
// Left Outer Join
val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept)
// Right Outer Join
val right_outer_join_data = manipulated_emp.rightOuterJoin(manipulated_dept)
// Full Outer Join
val full_outer_join_data = manipulated_emp.fullOuterJoin(manipulated_dept)

// Formatting the Joined Data for better understandable (using map)
val cleaned_joined_data = join_data.map(t => (t._2._1._1, t._2._1._2, t._1, t._2._2._1))

これにより、次のような出力が得られます。

// 出力 clean_joined_data をコンソールに出力します

scala> cleaned_joined_data.collect()
res13: Array[(Int, String, Int, String)] = Array((3,matt,30,hive), (5,rhonda,30,hive), (2,ricky,20,spark), (1,jordan,10,hadoop))
于 2015-08-14T23:52:38.130 に答える