3

BeamをFlinkKafkaProducer介して Avro データを読み書きしているときに、いくつかの問題に直面しています。FlinkKafkaConsumer

FlinkKafkaProducer誰かがAvroスキーマの実例と使用例を指摘できれば素晴らしいでしょうFlinkKafkaConsumer(Kafkaのコンフルエント版を使用していません)

A) BeamKafkaFlinkAvroProducerTest (プロデューサー)

KafkaProducer を直接使用した場合 (つまり、 ProduceSimpleData を呼び出した場合)、問題なく動作しています (テストのためだけに)。次の手順でUnboundedSource として使用FlinkKafkaProducerします (これが私がすべきことです) (つまり、 ProduceAvroData2を呼び出します)。

  1. まず、私が使用する場合AvroSerializationSchema schema = new AvroSerializationSchema(Test.class);

    つまり、基本的に Avro を使用しorg.apache.avro.specific.SpecificDatumWriterます。次のエラーに直面しています:

    Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.IndexedRecord
    at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
    at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
    
  2. 次に、TypeInformationSerializationSchema(パイプラインの AvroCoder に関係なく) 使用すると、Kafka テスト コンシューマー ツールがメッセージを出力するため、問題なく動作するようです。

    java.lang.String{"uname": "Joe", "id": 6}
    

B) BeamKafkaFlinkAvroConsumerTest (消費者)

TypeInformationSerializationSchemaコンシューマーとプロデューサーの両方で使用するか、コンシューマーとプロデューサーでそれぞれ and を使用するAvroDeserializationSchema必要があることを理解してAvroSerializationSchemaいます。

しかし、AvroDeserializationSchemaorの使用に関係なくTypeInformationSerializationSchema、次の例外が発生します。

Exception in thread "main" java.lang.NullPointerException: null value in entry: V=null
at com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
at com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
at org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)

非常に基本的なものが欠けている可能性があります。すべてのコードはこちらです。

4

0 に答える 0