S3 フォルダーにストリーミングされた xml ファイルを処理する必要があります。現在、以下のように実装しています。
まず、Spark の fileStream を使用してファイルを読み取ります
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
RDDごとに、ファイルが読み取られたかどうかを確認します
if (data.count() !=0)
文字列を新しい HDFS ディレクトリに書き込みます
data.coalesce(1).saveAsTextFile(sdir);
上記の HDFS ディレクトリから読み取る Dataframe を作成します。
val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)
Dataframe で何らかの処理を行い、JSON として保存します
loaddata.write.mode("append").json("s3://mybucket/somefolder")
どういうわけか、上記のアプローチは非常に非効率的であり、率直に言って非常に男子生徒的であると感じています. より良い解決策はありますか?どんな助けでも大歓迎です。
フォローアップの質問: データフレーム内のフィールド (列ではない) を操作するにはどうすればよいですか? 非常に複雑なネストされた xml があり、上記の方法を使用すると、9 列と 50 個の奇妙な内部構造体配列を持つデータフレームが得られます。特定のフィールド名を削除する必要があることを除けば、これで問題ありません。同じ構造を再度構築する必要があるため、データフレームを分解せずにそれを達成する方法はありますか?