1

dstreamを受け取った後のkafkaとスキーマレジストリを使用したsparkストリーミングで、sparkでdstreamバッチをDataframeに変換するにはどうすればよいですか?

confluent から KafkaAvroDecoder を使用した後の Dstream の型は Dstream(String,Object) です。以下のコードを使用すると、avro 列で Int のようなスキーマ データ型が Long に変更されます。

val kafkaStream: DStream[(String, Object)] =
      KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](
    ssc, kafkaParams, Set(topic)
      )

  // Load JSON strings into DataFrame
  kafkaStream.foreachRDD { rdd =>
    // Get the singleton instance of SQLContext
    val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
    import sqlContext.implicits._

val topicValueStrings = rdd.map(_._2.toString)
    val df = sqlContext.read.json(topicValueStrings)

コード参照

Object.toSting と json としての読み取りにより、int のスキーマが失われます。データフレーム列で型をキャストする代わりに他の方法はありますか?

4

0 に答える 0