1

I have two spark dataframes, with the following structure. As read before using sqlContext.

 itens.columns (scala command) 
 Array[String] = Array(id_location,id_item, name, price)

 rdd1 
 [1,1,item A,10]
 [1,2,item b,12]
 [1,3,item c,12]

 rdd2
 [1,2,item b,50]
 [1,4,item c,12]
 [1,5,item c,12]

I want the following result based on the composite key (id_location,id_item)

 [1,1,item A,10]
 [1,2,item b,50]
 [1,3,item c,12]
 [1,4,item c,12]
 [1,5,item c,12]

So, I want a result with distinct itens (regarding the composite key), but when I found a record with the same key in the both rdds, I want just keep with the record from rdd2.

Anyone have this kind of requirement ?

I am working with spark and scala.

Best Regards Raphael.

4

3 に答える 3

1

私はSparkを初めて使用するので、これを行うためのより良い方法があるかもしれませんが、(複合キーに基づいて)ペアのRDDにマップしてから、「正しい」要素のみを使用してfullOuterJoinを実行できますか? 「左」と「右」の両方のデータがある結果のデータは?

大まかな擬似コード:

val pairRdd1 = rdd1 map {
  line => 
    (line(0)+line(1), line)
}

val pairRdd2 = rdd2 map {
  line => 
    (line(0)+line(1), line)
}

val joined = pairRdd1.fullOuterJoin(pairRdd2)

joined map {
  (id, left, right) =>
    right.getOrElse(left.get)
}

午前中に時間があれば、実用的な例をまとめてみます。それが役立つことを願っています!

于 2015-10-21T23:42:36.593 に答える
0

@Stevenの答えは論理的には良いようですが、データに交差する要素が多くない場合(つまり、完全な外部結合により巨大なデータセットが生成される場合)、問題が発生する可能性があります。DataFrames も使用しているため、RDD に変換してから DataFrames に戻すことは、DataFrames API で実行できるタスクには過剰に思えます。これを行う方法を以下に説明します。

いくつかのサンプルデータから始めましょう(例から取得):

val rdd1 = sc.parallelize(Array((1,1,"item A",10), (1,2,"item b",12), (1,3,"item c",12)))
val rdd2 = sc.parallelize(Array((1,2,"item b",50), (1,4,"item c",12), (1,5,"item c",12)))

次に、それらを別の列エイリアスで DataFrame に変換できます。最終的にこれら 2 つの DataFrame を結合すると、後続の選択をより簡単に記述できるため、全体df1とここで異なるエイリアスを使用します(結合後に列の元を識別する方法があれば、これは不要です)。df2両方の DataFrame の結合には、フィルタリングする行が含まれていることに注意してください。

val df1 = rdd1.toDF("id_location", "id_item", "name", "price")
val df2 = rdd2.toDF("id_location_2", "id_item_2", "name_2", "price_2")

// df1.unionAll(df2).show()
// +-----------+-------+------+-----+
// |id_location|id_item|  name|price|
// +-----------+-------+------+-----+
// |          1|      1|item A|   10|
// |          1|      2|item b|   12|
// |          1|      3|item c|   12|
// |          1|      2|item b|   50|
// |          1|      4|item c|   12|
// |          1|      5|item c|   12|
// +-----------+-------+------+-----+

df1ここでは、 と の最初の 2 つの要素であるキーで 2 つの DataFrame を結合することから始めdf2ます。次に、同じ結合キーを持つdf1行が存在する行 (基本的には から) を選択して、別の DataFrame を作成します。df2その後、except を実行してdf1、以前に作成した DataFrame からすべての行を削除します。df1基本的に行ったことは、 に同一のものがある場所("id_location", "id_item")からすべての行を削除することであるため、これは補足と見なすことができますdf2。最後に、補数を結合しdf2て、出力 DataFrame を生成します。

val df_joined = df1.join(df2, (df1("id_location") === df2("id_location_2")) && (df1("id_item") === df2("id_item_2")))
val df1_common_keyed = df_joined.select($"id_location", $"id_item", $"name", $"price")
val df1_complement = df1.except(df1_common_keyed)
val df_union = df1_complement.unionAll(df2)

// df_union.show()
// +-----------+-------+------+-----+
// |id_location|id_item|  name|price|
// +-----------+-------+------+-----+
// |          1|      3|item c|   12|
// |          1|      1|item A|   10|
// |          1|      2|item b|   50|
// |          1|      4|item c|   12|
// |          1|      5|item c|   12|
// +-----------+-------+------+-----+

繰り返しますが、@Steven が提案したように、DataFrame を RDD に変換して実行することで、RDD API を使用できます。それがあなたのやりたいことなら、以下はあなたが使いたいものsubtractByKey()と上記の入力RDDを達成する別の方法です:

val keyed1 = rdd1.keyBy { case (id_location, id_item, _, _) => (id_location, id_item) }
val keyed2 = rdd2.keyBy { case (id_location, id_item, _, _) => (id_location, id_item) }
val unionRDD = keyed1.subtractByKey(keyed2).values.union(rdd2)

// unionRDD.collect().foreach(println)
// (1,1,item A,10)
// (1,3,item c,12)
// (1,2,item b,50)
// (1,4,item c,12)
// (1,5,item c,12)
于 2015-10-22T06:59:29.557 に答える
0

@スティーブンは正しい考えを持っています。データセットをキーと値のペアにマッピングしてから、outerjoin

val rdd1 = sc.parallelize(List((1,1,"item A",10),(1,2,"item b",12),(1,3,"item c",12)))
val rdd2 = sc.parallelize(List((1,2,"item b",50),(1,4,"item c",12),(1,5,"item c",12)))

val rdd1KV = rdd1.map{case(id_location,id_item, name, price) => ((id_location, id_item), (name, price))}
val rdd2KV = rdd2.map{case(id_location,id_item, name, price) => ((id_location, id_item), (name, price))}

val joined = rdd1KV.fullOuterJoin(rdd2KV)

val res = joined.map{case((id_location, id_item),(leftOption, rightOption)) =>
    val values = rightOption.getOrElse(leftOption.get)
    (id_location, id_item, values._1, values._2)
}

これにより、探している結果が得られます。

于 2015-10-22T06:51:11.957 に答える