3

Spark ストリーミング メソッドを使用して、ディレクトリにドロップされた XML データを正常に解析しfileStream、結果の RDD をテキスト ファイルに書き出すことができます。

val fStream = {
  ssc.fileStream[LongWritable, Text, XmlInputFormat](
    WATCHDIR, xmlFilter _, newFilesOnly = false, conf = hadoopConf)
}


fStream.foreachRDD(rdd =>
  if (rdd.count() == 0) {
    logger.info("No files..")
  })

val dStream = fStream.map{ case(x, y) =>
  logger.info("Hello from the dStream")
  logger.info(y.toString)
  scalaxb.fromXML[Music](scala.xml.XML.loadString(y.toString))
}

dStream.foreachRDD(rdd => rdd.saveAsTextFile("file:///tmp/xmlout"))

問題は、RDD を一時テーブルとして登録するために DataFrame に変換するか、saveAsParquetFile.

このコード:

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
dStream.foreachRDD(rdd => rdd.distinct().toDF().printSchema())

次のエラーが発生します。

java.lang.UnsupportedOperationException: Schema for type scalaxb.DataRecord[scala.Any] is not supported

sinceは自分のレコードのケース クラスを生成し、Spark が Reflection を使用して推論するのscalaxbは簡単だと思っていたでしょう。Scalaxb によって生成されたケース クラスを Spark と互換性のあるものにする方法についてアイデアを持っている Spark または Scalaxb の専門家はいますか?scalaxb.DataRecord

ところで、scalaxb から生成されたクラスは次のとおりです。

package generated

case class Song(attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
  lazy val title = attributes.get("@title") map { _.as[String] }
  lazy val length = attributes.get("@length") map { _.as[String] }
}

case class Album(song: Seq[generated.Song] = Nil,
  description: String,
  attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
  lazy val title = attributes.get("@title") map { _.as[String] }
}

case class Artist(album: Seq[generated.Album] = Nil,
  attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
  lazy val name = attributes.get("@name") map { _.as[String] }
}

case class Music(artist: Seq[generated.Artist] = Nil)
4

0 に答える 0