4

私は kafka-node (kafka のノード クライアント) を使用しており、コンシューマーを使用してトピックに関するメッセージを取得しています。残念ながら、「offsetOutOfRange」状態を受け取ります (offsetOutOfRange コールバックが呼び出されます)。私のアプリケーションは、消費者が生産者から大幅に遅れて、最初のオフセットと最新のオフセットの間に多少大きなギャップが残るまで、正常に動作していました。この時点で、私は (おそらく間違って) コンシューマがメッセージを受信し続けることができる (できればプロデューサーに追いつくことができる) と想定しました。

私のカフカ コンシューマ クライアント コードは次のとおりです。

:
:
var kafka = require('kafka-node');

var zookeeper = "10.0.1.201:2181";
var id = "embClient";

var Consumer = kafka.Consumer;
var client = new kafka.Client(zookeeper, id);
var consumer = new Consumer( client, [ { topic: "test", partition: 0 } ], { autoCommit: false } );

consumer.on('error', [error callback...]);

consumer.on('offsetOutOfRange', [offset error callback...]);

consumer.on('message', [message callback...]);
:
:

私は何か間違ったことをしていますか、それとも何かが欠けていますか?

そうでない場合は、いくつか質問があります。

(a) この状態を適切に処理するようにクライアントを記述する、認められた「最善の」方法はありますか?

(b) なぜこの条件が提起されるのでしょうか? (クライアントは中断したところからメッセージを読み続け、最終的に (理想的には) 追いつくことができたはずだと思います...)

(c) この条件を処理するコード/ロジックを記述し、コンシューマー オフセットを明示的に再配置して読み取る必要がありますか? (これは少し面倒なようです)...

どんな助けでも大歓迎です。

4

1 に答える 1

7

Kafka では利用できなくなったメッセージをアプリが読み取ろうとした可能性があると思います。Kafka は log.retention.* プロパティに基づいて古いメッセージを削除します。Kafka に 1000 件のメッセージを送信したとします。保持のため、Kafka は最初の 500 件のメッセージを削除しました。アプリがメッセージ 350 を読み取ろうとすると失敗し、offsetOutOfRange エラーが発生します。これは、コンシューマーが遅すぎて、コンシューマーが処理する前に Kafka ブローカーがメッセージを削除してしまったために発生した可能性があります。または、コンシューマーがクラッシュしましたが、最後に処理されたメッセージのオフセットがどこかに保存されていました。

Offset クラスを使用して、利用可能な最新/最古のオフセットを取得し (メソッドを参照fetch)、コンシューマのオフセットを更新できます。この方法を使用します。

一般に、この状況が発生したときにアプリが何をすべきかを判断するのは簡単ではありません。明らかに何かが間違っているからです。

お役に立てば幸いです、Lukáš

于 2015-08-25T20:43:48.720 に答える