2

Spark 2.1 の構造化ストリーミングを使用して、コンテンツがバイナリ avro エンコードされた Kafka トピックから読み取ります。

したがって、次のように設定した後DataFrame:

val messages = spark
  .readStream
  .format("kafka")
  .options(kafkaConf)
  .option("subscribe", config.getString("kafka.topic"))
  .load()

このDataFrame( messages.printSchema()) のスキーマを出力すると、次のようになります。

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- timestampType: integer (nullable = true)

この質問は avro-decoding の問題と直交しているはずですがvalue、メッセージの内容をDataFrame関数Dataset[BusinessObject]によって何らかの形で , に変換したいと仮定しましょうArray[Byte] => BusinessObject。完全性の例として、関数は次のようになります (avro4s を使用):

case class BusinessObject(userId: String, eventId: String)

def fromAvro(bytes: Array[Byte]): BusinessObject =
    AvroInputStream.binary[BusinessObject](
        new ByteArrayInputStream(bytes)
    ).iterator.next

もちろん、miguno がこの関連する質問で言っているようにDataFrame.map()、で変換を適用することはできませんBusinessObject

それは次のように定義できます。

implicit val myEncoder : Encoder[BusinessObject] = org.apache.spark.sql.Encoders.kryo[BusinessObject]

次に、マップを実行します。

val transformedMessages : Dataset[BusinessObjecŧ] = messages.map(row => fromAvro(row.getAs[Array[Byte]]("value")))

しかし、新しいスキーマをクエリすると、次のようになります。

root
 |-- value: binary (nullable = true)

BusinessObjectデータセットはケースクラスの製品プロパティを使用して正しい値を取得する必要があるため、それは意味がないと思います。

.schema(StructType)リーダーで使用する Spark SQL の例をいくつか見てきましたが、 を使用しているという理由だけでなくreadStream、そのようなフィールドで操作できるようになる前に実際に列を変換する必要があるため、それを行うことはできません。

私は、transformedMessagesデータセット スキーマがStructFieldケース クラスのフィールドを持つものであることを Spark SQL エンジンに伝えたいと考えています。

4

1 に答える 1

1

私はあなたが求めるものを正確に手に入れると言うでしょう。今日すでに説明し たように、 with serialized オブジェクトがEncoders.kryo生成されます。blobその内部構造は SQL エンジンにとって不透明であり、オブジェクトを逆シリアル化しないとアクセスできません。したがって、コードが実際に行うことは、あるシリアル化形式を別の形式に置き換えることです。

DataFrameもう 1 つの問題は、動的に型指定された( Dataset[Row]) と静的に型指定されたオブジェクトを混在させようとすることです。UDT API を除外すると、Spark SQL はこのようには機能しません。静的に使用するDatasetか、階層DataFrameを使用してエンコードされたオブジェクト構造で使用します。struct

良いニュースはBusinessObject、不器用さを必要とせずにうまく機能するような単純な製品タイプですEncoders.kryo。Kryo エンコーダーの定義をスキップして、暗黙的なエンコーダーを必ずインポートしてください。

import spark.implicits._
于 2017-01-05T17:41:20.130 に答える