11

それぞれのスキーマでデータに対して Avro を使用して Kafka Consumerを実行しようとすると、「AvroRuntimeException: Malformed data. Length is negative: -40」というエラーが返されます。バイト配列を jsonAvro write および readKafka Avro Binary *coderに変換する同様の問題が他の人に見られます。また、このConsumer Group Exampleも参照しましたが、これはすべて役に立ちましたが、これまでのところこのエラーの助けにはなりません.コードのこの部分まで動作します(73行目)

デコーダー デコーダー = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);

私は他のデコーダーを試し、byteArrayInputStream 変数の内容を出力しました。これは、シリアル化された avro データがどのように見えると私が予想するかを示しています (メッセージでは、スキーマといくつかのデータといくつかの不正なデータを見ることができます)。 594 を返す .available() メソッドを使用して使用可能なバイト数。このエラーが発生する理由を理解できません。Apache Nifi を使用して、 hdfs から同じスキーマを持つ Kafka ストリームを生成します。助けていただければ幸いです。

4

1 に答える 1

22

おそらく問題は、Nifi による Avro データの書き込み (エンコード) 方法と、コンシューマー アプリによるデータの読み取り (デコード) 方法の不一致です。

簡単に言うと、Avro の API はシリアル化に対して 2 つの異なるアプローチを提供します。

  1. 適切な Avroファイルを作成するには: データ レコードをエンコードするだけでなく、Avro スキーマを一種のプリアンブルに埋め込む (経由org.apache.avro.file.{DataFileWriter/DataFileReader})。(a) 通常、Avro ファイルの「ペイロード」は、埋め込まれた Avro スキーマよりも桁違いに大きく、(b) これらのファイルを自由にコピーまたは移動できるため、スキーマを Avro ファイルに埋め込むことは非常に理にかなっています。それでも、誰かや何かに相談することなく、それらをもう一度読むことができることを確認してください.
  2. データ レコードのみをエンコードする場合、つまりスキーマを埋め込まない場合 ( を介してorg.apache.avro.io.{BinaryEncoder/BinaryDecoder}; パッケージ名の違いに注意してください: iohere とfileその上)。このアプローチは、Kafka トピックに書き込まれているメッセージを Avro エンコードするときによく好まれます。 (非常に合理的な) ポリシーは、同じ Kafka トピックの場合、メッセージは同じ Avro スキーマでフォーマット/エンコードされるというものです。これは大きな利点です。ストリーム データのコンテキストでは、移動中のデータのデータ レコードは通常、上記の保存データの Avro ファイル (多くの場合、数百または数千 MB); そのため、Avro スキーマのサイズは比較的大きいため、2000 個のデータ レコードを Kafka に書き込むときに 2000x を埋め込みたくありません。欠点は、「何とか」しなければならないことです。Avro スキーマが Kafka トピックにどのようにマッピングされるかを追跡します。より正確には、スキーマを直接埋め込むパスをたどることなく、メッセージがどの Avro スキーマでエンコードされたかを追跡する必要があります。良いニュースは、これを透過的に行うための Kafka エコシステム (Avro スキーマ レジストリ) で利用可能なツール。そのため、バリアント 1 と比較して、バリアント 2 は利便性を犠牲にして効率を上げています。

その結果、エンコードされた Avro データの「ワイヤ形式」は、上記の (1) または (2) のどちらを使用するかによって異なるように見えます。

私は Apache Nifi にあまり詳しくありませんが、ソース コード (たとえば、ConvertAvroToJSON.java ) をざっと見てみると、バリアント 1 を使用していることがわかります。つまり、Avro レコードと共に Avro スキーマが埋め込まれています。ただし、コンシューマー コードではDecoderFactory.get().binaryDecoder()バリアント 2 (スキーマが埋め込まれていない) が使用されます。

おそらく、これはあなたが遭遇したエラーを説明していますか?

于 2016-03-16T10:57:20.750 に答える