BeamをFlinkKafkaProducer
介して Avro データを読み書きしているときに、いくつかの問題に直面しています。FlinkKafkaConsumer
FlinkKafkaProducer
誰かがAvroスキーマの実例と使用例を指摘できれば素晴らしいでしょうFlinkKafkaConsumer
(Kafkaのコンフルエント版を使用していません)
A) BeamKafkaFlinkAvroProducerTest (プロデューサー)
KafkaProducer を直接使用した場合 (つまり、 ProduceSimpleData を呼び出した場合)、問題なく動作しています (テストのためだけに)。次の手順でUnboundedSource として使用FlinkKafkaProducer
します (これが私がすべきことです) (つまり、 ProduceAvroData2を呼び出します)。
まず、私が使用する場合
AvroSerializationSchema schema = new AvroSerializationSchema(Test.class);
つまり、基本的に Avro を使用し
org.apache.avro.specific.SpecificDatumWriter
ます。次のエラーに直面しています:Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.IndexedRecord at org.apache.avro.generic.GenericData.getField(GenericData.java:580) at org.apache.avro.generic.GenericData.getField(GenericData.java:595) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
次に、
TypeInformationSerializationSchema
(パイプラインの AvroCoder に関係なく) 使用すると、Kafka テスト コンシューマー ツールがメッセージを出力するため、問題なく動作するようです。java.lang.String{"uname": "Joe", "id": 6}
B) BeamKafkaFlinkAvroConsumerTest (消費者)
TypeInformationSerializationSchema
コンシューマーとプロデューサーの両方で使用するか、コンシューマーとプロデューサーでそれぞれ and を使用するAvroDeserializationSchema
必要があることを理解してAvroSerializationSchema
います。
しかし、AvroDeserializationSchema
orの使用に関係なくTypeInformationSerializationSchema
、次の例外が発生します。
Exception in thread "main" java.lang.NullPointerException: null value in entry: V=null
at com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
at com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
at org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
非常に基本的なものが欠けている可能性があります。すべてのコードはこちらです。