2

S3 フォルダーにストリーミングされた xml ファイルを処理する必要があります。現在、以下のように実装しています。

まず、Spark の fileStream を使用してファイルを読み取ります

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

RDDごとに、ファイルが読み取られたかどうかを確認します

if (data.count() !=0)

文字列を新しい HDFS ディレクトリに書き込みます

data.coalesce(1).saveAsTextFile(sdir);

上記の HDFS ディレクトリから読み取る Dataframe を作成します。

val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)

Dataframe で何らかの処理を行い、JSON として保存します

loaddata.write.mode("append").json("s3://mybucket/somefolder")

どういうわけか、上記のアプローチは非常に非効率的であり、率直に言って非常に男子生徒的であると感じています. より良い解決策はありますか?どんな助けでも大歓迎です。

フォローアップの質問: データフレーム内のフィールド (列ではない) を操作するにはどうすればよいですか? 非常に複雑なネストされた xml があり、上記の方法を使用すると、9 列と 50 個の奇妙な内部構造体配列を持つデータフレームが得られます。特定のフィールド名を削除する必要があることを除けば、これで問題ありません。同じ構造を再度構築する必要があるため、データフレームを分解せずにそれを達成する方法はありますか?

4

1 に答える 1

4

Spark 2.0 を使用している場合は、構造化ストリーミングで動作させることができる場合があります。

val inputDF = spark.readStream.format("com.databricks.spark.xml")
  .option("rowTag", "Trans")
  .load(path)
于 2016-11-18T15:41:45.390 に答える