セットの分析をしようとしています。次のようなサンプル データ セットがあります。
注文.json
{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}
それは、ID を表す数字のリストである単一のフィールドだけです。
実行しようとしているSparkスクリプトは次のとおりです。
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Dataframe Test")
val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)
val dataframe = sql.read.json("orders.json")
val expanded = dataframe
.explode[::[Long], Long]("items", "item1")(row => row)
.explode[::[Long], Long]("items", "item2")(row => row)
val grouped = expanded
.where(expanded("item1") !== expanded("item2"))
.groupBy("item1", "item2")
.count()
val recs = grouped
.groupBy("item1")
作成expanded
しgrouped
て問題ありません。一言で言えばexpanded
、2 つの ID が同じ元のセットにあった、2 つの ID のすべての可能なセットのリストです。grouped
自分自身と一致した ID を除外し、ID のすべての一意のペアをグループ化し、それぞれのカウントを生成します。のスキーマとデータ サンプルは次のgrouped
とおりです。
root
|-- item1: long (nullable = true)
|-- item2: long (nullable = true)
|-- count: long (nullable = false)
[1,2,2]
[1,3,2]
[1,4,1]
[1,5,3]
[2,1,2]
[2,3,1]
[2,4,1]
[2,5,2]
...
だから、私の質問は、タプルのリストを得るために、各結果の最初の項目をグループ化するにはどうすればよいですか? 上記のサンプル データの場合、次のようなものが期待できます。
[1, [(2, 2), (3, 2), (4, 1), (5, 3)]]
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]]
のスクリプトでわかるように、recs
各行の最初のアイテムである「item1」で groupBy を実行することから始めると思いました。しかしその後は、この GroupedData オブジェクトに対するアクションが非常に制限されたままになります。実際には、sum、avg などの集計を行うだけです。各結果のタプルを一覧表示したいだけです。
この時点で RDD 関数を簡単に使用できましたが、それはデータフレームの使用から逸脱しています。データフレーム関数でこれを行う方法はありますか。