Spark ストリーミングを使用して kafka からメッセージを読み取る Spark 2.0 アプリケーションがあります (spark-streaming-kafka-0-10_2.11 を使用)。
構造化されたストリーミングはとてもクールに見えるので、コードを移行してみようと思ったのですが、使い方がわかりません。
通常のストリーミングでは kafkaUtils を使用して createDstrean を使用し、渡したパラメーターでは値デシリアライザーでした。
構造化ストリーミングでは、ドキュメントには DataFrame 関数を使用して逆シリアル化する必要があると書かれていますが、それが何を意味するのか正確にはわかりません。
この例などの例を見ましたが、Kafka の Avro オブジェクトは非常に複雑で、例の String のように単純にキャストすることはできません..
これまでのところ、この種のコードを試しました(別の質問でここで見ました):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
そして、「データ型の不一致: BinaryType を StructType(StructField(....) にキャストできません」
値を逆シリアル化するにはどうすればよいですか?