2

HighLevel コンシューマーを実装するために Apache kafka を調べてきました (メッセージをいじりたくないので、単にデータを MongoDB に入れる必要があるだけです) v0.8.1.1

コンシューマーの実装方法に関する非常に詳細な情報を示す以下のリンクを見てきました。

Apache Kafka コンシューマー wiki 別のカフカ コンシューマー

しかし、すべてのスレッドがシャットダウンされた後に Consumer がどのように再起動するかについては、まだわかりません。たとえば、実行中のコンシューマーの 4 つのスレッドがあり、それらが kafka ブローカーからのすべてのメッセージを消費したとしましょう。メッセージがなくなると、すべてのコンシューマーは何もせず、特定のタイムアウト後にシャットダウンされるため、コンシューマーが再びどのようになっているのかわかりませんkafka ブローカーに新しいメッセージがあると再起動します。

誰かがコードを共有したり、少なくともこれに関するいくつかの指針を共有したりできますか? また、while ループの代わりに、メッセージがあるときに呼び出されるコールバック メソッドにビジネス ロジックを含める方法もあります。

4

1 に答える 1

3

シャットダウン中のタイムアウトの使用を誤解している可能性があると思います。理論的には、イベント間の時間に関係なく、無限のイベント ストリームを消費しているため、コードを更新するか、マシンがクラッシュしない限り、consuner をシャットダウンしないでください実際にコンシューマーをシャットダウンする必要がある場合、10000 ミリ秒のタイムアウトにより、Kafka コンシューマーが最後に読み取ったオフセットを ZooKeeper に書き込むのに十分な時間が与えられます。これにより、コンシューマーが再起動されたときに、最後に処理したオフセットから再開されます。このコンシューマーのシャットダウンは通常、コンシューマーだけでなく、プログラムがシャットダウンされた (おそらく InterruptedException がキャッチされた) ときに発生します。したがって、プログラムが再起動されると、consuner が再起動されます。

編集

Kafka のConsumerIteratorがこの終わりのない消費モデルに従う理由を付け加えておきます。イテレータのnextメソッドは、次のメッセージを読み取れるまで常にブロックされます。したがって、この例のタイムアウトに到達する唯一の方法は、何らかの例外によってコンシューマ スレッドがシャットダウンされた場合です。

編集2

コールバックをサポートする Kafka コンシューマー API は見たことがありません。現時点で唯一のオプションは、独自のコールバック実装を作成することだと思います。たとえば、次のようになります。

public interface Callback {
  void call(MessageAndMetadata message);
}

Executor executor = Executors.newCachedThreadPool();
final Callback<byte[], byte[]> callback = new MyCallback();
while (it.hasNext()) {
  final MessageAndMetadata message = it.next();
  executor.submit(new Runnable() {
    public void run() {
      callback.call(message);
    }
  });
}

彼らが現在 Kafka 0.9 のコンシューマー API を書き直していることに興味があるかもしれませんが、書き直しでコールバックを見たことはないと思います (間違っている可能性もあります)。

于 2015-03-31T20:00:29.363 に答える