8

Kafka Web サイトのドキュメントを読みましたが、完全な最小限の例 (プロデューサー --> kafka --> コンシューマー) を実装しようとした後、「コンシューマー状態」、オフセットを処理する必要がある方法がよくわかりません。

いくつかの情報

  1. HighLevel API (Java) を使用しています
  2. 私のコンシューマはメインを持つ単純なクラスで、基本的に「クイックスタート」Kafka ページにあるものと同じです。
  3. Zookeeper を使用しています
  4. 私は単一のブローカーを使用しています

さて、ドキュメントによると、HighLevel API コンシューマは Zookeeper を使用してその状態を保存するため、オフセットが予想されるため、コンシューマの状態は次の間で維持されます。

  • Kafka ブローカーの再起動
  • コンシューマの再起動

しかし、残念ながらそうではありません。ブローカーまたはコンシューマーを再起動するたびに、すべてのメッセージが再配信されます。さて、おそらくこれらはばかげた質問ですが、

  1. Kafkaの再起動の場合:状態を維持するのは消費者次第だと理解したので、おそらくブローカーが(再)起動してすべての(!)メッセージを再配信し、消費者が何を消費するかを決定します...そうですか?その場合、10.0000.0000 件のメッセージがあるとどうなりますか?

  2. JVM コンシューマーの再起動の場合: 状態が Zookeeper で保持されている場合、メッセージが再配信されるのはなぜですか? 新しい JVM が別のコンシューマー「ID」を持つ可能性はありますか? この場合、以前の ID をバインドするにはどうすればよいでしょうか?

4

3 に答える 3

4

はい、コンシューマーはその状態を維持する責任があり、Javaの高レベルのコンシューマーはその状態をzookeeperに保存します。

ほとんどの場合、groupId構成プロパティを指定していません。そのような状況では、kafkaはランダムに生成しgroupIdます。

autocommit.enable構成プロパティをオフにした可能性もあります。

Kafka構成の完全なリファレンスは、次のページにあります: http: //kafka.apache.org/configuration.html 「高レベルのコンシューマーの重要な構成プロパティ」タイトル。

于 2013-02-12T19:27:38.340 に答える
3

私は悪い読者だったようです...それはすべて構成ページにあります。具体的には、デフォルトで「最小」に設定されているフラグ「autooffset.reset」を設定することで、両方の質問が解決されたため、説明した効果が発生します。

現在、値として「最大」を使用すると、オフセットが常に最大であるため、コンシューマーとブローカーの両方の再起動の場合に、物事は期待どおりに機能しています。

于 2013-02-13T09:13:43.317 に答える