4

s3 ファイルからの次の DataFrame 入力があり、データを次の目的の出力に変換する必要があります。Scala で Spark バージョン 1.5.1 を使用していますが、Python で Spark に変更できます。どんな提案でも大歓迎です。

データフレーム入力:

name    animal   data
john    mouse    aaaaa
bob     mouse    bbbbb
bob     mouse    ccccc
bob     dog      ddddd

望ましい出力:

john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv

terminal$ cat bob/mouse/file.csv
bbbbb
ccccc

terminal$ cat bob/dog/file.csv
ddddd

以下は、私が試した既存の Spark Scala コードです。

val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)

現在の出力:

[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]

私の既存のコードの問題のいくつかは、groupBy が GroupedData オブジェクトを返し、おそらくそのデータに対して count/sum/agg 関数を実行したくないということです。データをグループ化して出力するためのより良い手法を探しています。データセットは非常に大きいです。

4

1 に答える 1

5

これはpartitionBy、 のオプションを使用して実現できますDataFrameWriter。一般的な構文は次のとおりです。

df.write.partitionBy("name", "animal").format(...).save(...)

残念ながら、Spark 1.5 でパーティショニングをサポートするプレーン テキスト形式は JSON のみです。

Spark のインストールを次のように更新できる場合:

  • partitionBy1.6 - フォーマットで使用できtextます。グループ ( ) に対して単一の出力ファイルが必要な場合は、1.6 も必要ですrepartition
  • partitionBy2.0 - フォーマットで使用できcsvます。

1.5 では、ファイルを JSON として書き込み、個々の出力ファイルを変換するのが最善の方法だと思います。

個別の数name', 'animalsが少ない場合は、グループごとに個別の書き込みを実行できます。

val dist = df.select("name", "animal").rdd.collect.map {
  case Row(name: String, animal: String) => (name, animal)
}

for {
  (name, animal) <- dist
} df.where($"name" === name && $"animal" === animal)
    .select($"data").write.format("csv").save(s"/prefix/$name/$animal")

ただし、組み合わせの数が増えると、これはスケーリングされません。

于 2016-11-11T19:36:02.070 に答える