Kafka Streams で Avro データをストリーミングしようとすると、次のエラーが発生しました。
Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
メーリングリストでそれに関するいくつかの古いスレッドを見つけましたが、そこに記載されている解決策はどれも問題を解決しませんでした. うまくいけば、ここで解決策を見つけることができます。
私のセットアップは次のようになります。
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)
KEY_SERDE
をと同じに設定しようとしましたVALUE_SERDE
が、これはメーリング リストで解決策として「マーク」されていましたが、私の場合はうまくいきませんでした。
GenericData.Record
次のようにスキーマを使用し て生成しています。
val record = new GenericData.Record(schema)
...
record.put(field, value)
デバッグ モードを開始して生成されたレコードを確認すると、すべて問題なく表示され、レコードにデータがあり、マッピングが正しいです。
KStream を次のようにストリーミングします (以前はブランチを使用していました)。
splitTopics.get(0).to(s"${destTopic}_Testing")
記録用に使用GenericData.Record
しています。これは との組み合わせで問題になる可能性がありGenericAvroSerde
ますか?