私は kafka-node (kafka のノード クライアント) を使用しており、コンシューマーを使用してトピックに関するメッセージを取得しています。残念ながら、「offsetOutOfRange」状態を受け取ります (offsetOutOfRange コールバックが呼び出されます)。私のアプリケーションは、消費者が生産者から大幅に遅れて、最初のオフセットと最新のオフセットの間に多少大きなギャップが残るまで、正常に動作していました。この時点で、私は (おそらく間違って) コンシューマがメッセージを受信し続けることができる (できればプロデューサーに追いつくことができる) と想定しました。
私のカフカ コンシューマ クライアント コードは次のとおりです。
:
:
var kafka = require('kafka-node');
var zookeeper = "10.0.1.201:2181";
var id = "embClient";
var Consumer = kafka.Consumer;
var client = new kafka.Client(zookeeper, id);
var consumer = new Consumer( client, [ { topic: "test", partition: 0 } ], { autoCommit: false } );
consumer.on('error', [error callback...]);
consumer.on('offsetOutOfRange', [offset error callback...]);
consumer.on('message', [message callback...]);
:
:
私は何か間違ったことをしていますか、それとも何かが欠けていますか?
そうでない場合は、いくつか質問があります。
(a) この状態を適切に処理するようにクライアントを記述する、認められた「最善の」方法はありますか?
(b) なぜこの条件が提起されるのでしょうか? (クライアントは中断したところからメッセージを読み続け、最終的に (理想的には) 追いつくことができたはずだと思います...)
(c) この条件を処理するコード/ロジックを記述し、コンシューマー オフセットを明示的に再配置して読み取る必要がありますか? (これは少し面倒なようです)...
どんな助けでも大歓迎です。