dstreamを受け取った後のkafkaとスキーマレジストリを使用したsparkストリーミングで、sparkでdstreamバッチをDataframeに変換するにはどうすればよいですか?
confluent から KafkaAvroDecoder を使用した後の Dstream の型は Dstream(String,Object) です。以下のコードを使用すると、avro 列で Int のようなスキーマ データ型が Long に変更されます。
val kafkaStream: DStream[(String, Object)] =
KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](
ssc, kafkaParams, Set(topic)
)
// Load JSON strings into DataFrame
kafkaStream.foreachRDD { rdd =>
// Get the singleton instance of SQLContext
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val topicValueStrings = rdd.map(_._2.toString)
val df = sqlContext.read.json(topicValueStrings)
Object.toSting と json としての読み取りにより、int のスキーマが失われます。データフレーム列で型をキャストする代わりに他の方法はありますか?