@Stevenの答えは論理的には良いようですが、データに交差する要素が多くない場合(つまり、完全な外部結合により巨大なデータセットが生成される場合)、問題が発生する可能性があります。DataFrames も使用しているため、RDD に変換してから DataFrames に戻すことは、DataFrames API で実行できるタスクには過剰に思えます。これを行う方法を以下に説明します。
いくつかのサンプルデータから始めましょう(例から取得):
val rdd1 = sc.parallelize(Array((1,1,"item A",10), (1,2,"item b",12), (1,3,"item c",12)))
val rdd2 = sc.parallelize(Array((1,2,"item b",50), (1,4,"item c",12), (1,5,"item c",12)))
次に、それらを別の列エイリアスで DataFrame に変換できます。最終的にこれら 2 つの DataFrame を結合すると、後続の選択をより簡単に記述できるため、全体df1
とここで異なるエイリアスを使用します(結合後に列の元を識別する方法があれば、これは不要です)。df2
両方の DataFrame の結合には、フィルタリングする行が含まれていることに注意してください。
val df1 = rdd1.toDF("id_location", "id_item", "name", "price")
val df2 = rdd2.toDF("id_location_2", "id_item_2", "name_2", "price_2")
// df1.unionAll(df2).show()
// +-----------+-------+------+-----+
// |id_location|id_item| name|price|
// +-----------+-------+------+-----+
// | 1| 1|item A| 10|
// | 1| 2|item b| 12|
// | 1| 3|item c| 12|
// | 1| 2|item b| 50|
// | 1| 4|item c| 12|
// | 1| 5|item c| 12|
// +-----------+-------+------+-----+
df1
ここでは、 と の最初の 2 つの要素であるキーで 2 つの DataFrame を結合することから始めdf2
ます。次に、同じ結合キーを持つdf1
行が存在する行 (基本的には から) を選択して、別の DataFrame を作成します。df2
その後、except を実行してdf1
、以前に作成した DataFrame からすべての行を削除します。df1
基本的に行ったことは、 に同一のものがある場所("id_location", "id_item")
からすべての行を削除することであるため、これは補足と見なすことができますdf2
。最後に、補数を結合しdf2
て、出力 DataFrame を生成します。
val df_joined = df1.join(df2, (df1("id_location") === df2("id_location_2")) && (df1("id_item") === df2("id_item_2")))
val df1_common_keyed = df_joined.select($"id_location", $"id_item", $"name", $"price")
val df1_complement = df1.except(df1_common_keyed)
val df_union = df1_complement.unionAll(df2)
// df_union.show()
// +-----------+-------+------+-----+
// |id_location|id_item| name|price|
// +-----------+-------+------+-----+
// | 1| 3|item c| 12|
// | 1| 1|item A| 10|
// | 1| 2|item b| 50|
// | 1| 4|item c| 12|
// | 1| 5|item c| 12|
// +-----------+-------+------+-----+
繰り返しますが、@Steven が提案したように、DataFrame を RDD に変換して実行することで、RDD API を使用できます。それがあなたのやりたいことなら、以下はあなたが使いたいものsubtractByKey()
と上記の入力RDDを達成する別の方法です:
val keyed1 = rdd1.keyBy { case (id_location, id_item, _, _) => (id_location, id_item) }
val keyed2 = rdd2.keyBy { case (id_location, id_item, _, _) => (id_location, id_item) }
val unionRDD = keyed1.subtractByKey(keyed2).values.union(rdd2)
// unionRDD.collect().foreach(println)
// (1,1,item A,10)
// (1,3,item c,12)
// (1,2,item b,50)
// (1,4,item c,12)
// (1,5,item c,12)