9

Spark ストリーミングを使用して kafka からメッセージを読み取る Spark 2.0 アプリケーションがあります (spark-streaming-kafka-0-10_2.11 を使用)。

構造化されたストリーミングはとてもクールに見えるので、コードを移行してみようと思ったのですが、使い方がわかりません。

通常のストリーミングでは kafkaUtils を使用して createDstrean を使用し、渡したパラメーターでは値デシリアライザーでした。

構造化ストリーミングでは、ドキュメントには DataFrame 関数を使用して逆シリアル化する必要があると書かれていますが、それが何を意味するのか正確にはわかりません。

この例などの例を見ましたが、Kafka の Avro オブジェクトは非常に複雑で、例の String のように単純にキャストすることはできません..

これまでのところ、この種のコードを試しました(別の質問でここで見ました):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

そして、「データ型の不一致: BinaryType を StructType(StructField(....) にキャストできません」

値を逆シリアル化するにはどうすればよいですか?

4

4 に答える 4

3

Spark のシリアライゼーションが新しい/実験的な構造化ストリーミングとどのように組み合わされて機能するかについては、まだあまり詳しくありませんが、以下のアプローチは機能しますが、それが最善の方法であるかどうかはわかりません (IMHO のアプローチはややぎこちなく見えます。感じられる)。

Foo具体的には Avro ではなく、カスタム データ型 (ここではケース クラス) の例であなたの質問に答えようとしますが、とにかくそれが役に立てば幸いです。Kryo シリアライゼーションを使用してカスタム型をシリアライズ/デシリアライズするという考え方です。Sparkドキュメントの「チューニング: データのシリアライゼーション」を参照してください。

注: Spark は、組み込みの (暗黙的な) エンコーダーを使用して、そのままでケース クラスのシリアル化をサポートします。このエンコーダーは、 import spark.implicits._. しかし、この例のためにこの機能を無視しましょう。

次のケース クラスをカスタム タイプとして定義したとします (TL;DR ヒント: 奇妙な Spark シリアライゼーションの苦情/エラーが発生しないようにするには、コードを別のファイルFooに入れる必要があります)。Foo.scala

// This could also be your auto-generated Avro class/type
case class Foo(s: String)

これで、Kafka からデータを読み取るための構造化ストリーミング コードが次のようになりました。ここで、入力トピックには、メッセージ値がバイナリ エンコードされた Kafka メッセージが含まれていStringます。目標は、Fooこれらのメッセージ値に基づいてインスタンスを作成することです (つまり、 d バイナリ データを Avro クラスのインスタンスに逆シリアル化します):

val messages: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "my-input-topic")
    .load()

ここで、値をカスタム型のインスタンスにデシリアライズFooしています。これには、まず Implicit を定義する必要がありますEncoder[Foo]

implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo]
val foos: Dataset[Foo] = messages.map(row => Foo(new String(row.getAs[Array[Byte]]("value")))

Avro の質問に戻ると、次のことを行う必要があります。

  1. ニーズに合わせて適切Encoderに作成します。
  2. Foo(new String(row.getAs[Array[Byte]]("value"))バイナリ エンコードされた Avro データを Avro POJO に逆シリアル化するコードに置き換えます。つまり、バイナリ エンコードされた Avro データをメッセージ値 ( ) から取り出して、Avro など、他の場所で定義したものをrow.getAs[Array[Byte]]("value")返すコードです。GenericRecordSpecificCustomAvroObject

他の誰かがタルの質問に答えるより簡潔な/より良い/...方法を知っているなら、私はすべて耳を傾けます。:-)

以下も参照してください。

于 2016-11-21T10:23:41.340 に答える