9

kafkaを使用する場合、kafka プロデューサーの kafka.compression.codec プロパティを設定することでコーデックを設定できます。

プロデューサーでスナッピー圧縮を使用するとします。カフカコンシューマーを使用してカフカからメッセージを消費する場合、スナッピーからデータをデコードするために何かをする必要がありますか、それともカフカコンシューマーの組み込み機能ですか?

関連ドキュメントでは、kafka コンシューマーのエンコーディングに関連するプロパティを見つけることができませんでした (プロデューサーのみに関連しています)。

誰かこれクリアできる?

4

2 に答える 2

15

私の理解によると、解凍は消費者自身によって処理されます。公式wikiページで述べたように The consumer iterator transparently decompresses compressed data and only returns an uncompressed message

この記事にあるように消費者の仕組みは次のとおりです。

コンシューマーにはバックグラウンドの「フェッチャー」スレッドがあり、ブローカーから 1MB のバッチでデータを継続的にフェッチし、それを内部ブロッキング キューに追加します。コンシューマ スレッドは、このブロッキング キューからデータをデキューし、メッセージを解凍して繰り返し処理します。

また、エンドツーエンドのバッチ圧縮の下のドキュメントページにも、次のように書かれています

メッセージのバッチをまとめて圧縮し、この形式でサーバーに送信できます。このメッセージのバッチは圧縮形式で書き込まれ、ログに圧縮されたままになり、コンシューマーによってのみ解凍されます。

したがって、解凍部分はコンシューマー自体で処理されるようです。必要なのは、compression.codecプロデューサーの作成中に ProducerConfig 属性を使用して有効な/サポートされている圧縮タイプを提供することだけです。消費者側での解凍のアプローチを示す例や説明が見つかりませんでした。間違っている場合は修正してください。

于 2013-11-11T09:22:47.500 に答える
0

私はv0.8.1で同じ問題を抱えており、Kafkaでのこの圧縮解凍は、コンシューマが圧縮されたデータを「透過的に」解凍する必要があることを除いて、文書化されていません。

Kafka Web サイトでConsumerIteratorを使用する高レベルのコンシューマー クライアントの例は、圧縮されていないデータでのみ機能します。Producer クライアントで圧縮を有効にすると、メッセージは次の "while" ループに入りません。うまくいけば、彼らはこの問題をできるだけ早く修正するか、この機能を要求しないでください。一部のユーザーは、バッチ処理と圧縮機能を必要とする大きなサイズのメッセージを転送するために Kafka を使用する可能性があるためです。

ConsumerIterator <byte[], byte[]> it = stream.iterator();
while(it.hasNext())
{
   String message = new String(it.next().message());
}
于 2014-10-09T00:33:24.330 に答える