おそらく問題は、Nifi による Avro データの書き込み (エンコード) 方法と、コンシューマー アプリによるデータの読み取り (デコード) 方法の不一致です。
簡単に言うと、Avro の API はシリアル化に対して 2 つの異なるアプローチを提供します。
- 適切な Avroファイルを作成するには: データ レコードをエンコードするだけでなく、Avro スキーマを一種のプリアンブルに埋め込む (経由
org.apache.avro.file.{DataFileWriter/DataFileReader}
)。(a) 通常、Avro ファイルの「ペイロード」は、埋め込まれた Avro スキーマよりも桁違いに大きく、(b) これらのファイルを自由にコピーまたは移動できるため、スキーマを Avro ファイルに埋め込むことは非常に理にかなっています。それでも、誰かや何かに相談することなく、それらをもう一度読むことができることを確認してください.
- データ レコードのみをエンコードする場合、つまりスキーマを埋め込まない場合 ( を介して
org.apache.avro.io.{BinaryEncoder/BinaryDecoder}
; パッケージ名の違いに注意してください: io
here とfile
その上)。このアプローチは、Kafka トピックに書き込まれているメッセージを Avro エンコードするときによく好まれます。 (非常に合理的な) ポリシーは、同じ Kafka トピックの場合、メッセージは同じ Avro スキーマでフォーマット/エンコードされるというものです。これは大きな利点です。ストリーム データのコンテキストでは、移動中のデータのデータ レコードは通常、上記の保存データの Avro ファイル (多くの場合、数百または数千 MB); そのため、Avro スキーマのサイズは比較的大きいため、2000 個のデータ レコードを Kafka に書き込むときに 2000x を埋め込みたくありません。欠点は、「何とか」しなければならないことです。Avro スキーマが Kafka トピックにどのようにマッピングされるかを追跡します。より正確には、スキーマを直接埋め込むパスをたどることなく、メッセージがどの Avro スキーマでエンコードされたかを追跡する必要があります。良いニュースは、これを透過的に行うための Kafka エコシステム (Avro スキーマ レジストリ) で利用可能なツール。そのため、バリアント 1 と比較して、バリアント 2 は利便性を犠牲にして効率を上げています。
その結果、エンコードされた Avro データの「ワイヤ形式」は、上記の (1) または (2) のどちらを使用するかによって異なるように見えます。
私は Apache Nifi にあまり詳しくありませんが、ソース コード (たとえば、ConvertAvroToJSON.java ) をざっと見てみると、バリアント 1 を使用していることがわかります。つまり、Avro レコードと共に Avro スキーマが埋め込まれています。ただし、コンシューマー コードではDecoderFactory.get().binaryDecoder()
バリアント 2 (スキーマが埋め込まれていない) が使用されます。
おそらく、これはあなたが遭遇したエラーを説明していますか?