Spark 2.1 の構造化ストリーミングを使用して、コンテンツがバイナリ avro エンコードされた Kafka トピックから読み取ります。
したがって、次のように設定した後DataFrame
:
val messages = spark
.readStream
.format("kafka")
.options(kafkaConf)
.option("subscribe", config.getString("kafka.topic"))
.load()
このDataFrame
( messages.printSchema()
) のスキーマを出力すると、次のようになります。
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: long (nullable = true)
|-- timestampType: integer (nullable = true)
この質問は avro-decoding の問題と直交しているはずですがvalue
、メッセージの内容をDataFrame
関数Dataset[BusinessObject]
によって何らかの形で , に変換したいと仮定しましょうArray[Byte] => BusinessObject
。完全性の例として、関数は次のようになります (avro4s を使用):
case class BusinessObject(userId: String, eventId: String)
def fromAvro(bytes: Array[Byte]): BusinessObject =
AvroInputStream.binary[BusinessObject](
new ByteArrayInputStream(bytes)
).iterator.next
もちろん、miguno がこの関連する質問で言っているようにDataFrame.map()
、で変換を適用することはできませんBusinessObject
。
それは次のように定義できます。
implicit val myEncoder : Encoder[BusinessObject] = org.apache.spark.sql.Encoders.kryo[BusinessObject]
次に、マップを実行します。
val transformedMessages : Dataset[BusinessObjecŧ] = messages.map(row => fromAvro(row.getAs[Array[Byte]]("value")))
しかし、新しいスキーマをクエリすると、次のようになります。
root
|-- value: binary (nullable = true)
BusinessObject
データセットはケースクラスの製品プロパティを使用して正しい値を取得する必要があるため、それは意味がないと思います。
.schema(StructType)
リーダーで使用する Spark SQL の例をいくつか見てきましたが、 を使用しているという理由だけでなくreadStream
、そのようなフィールドで操作できるようになる前に実際に列を変換する必要があるため、それを行うことはできません。
私は、transformedMessages
データセット スキーマがStructField
ケース クラスのフィールドを持つものであることを Spark SQL エンジンに伝えたいと考えています。