しばらくすると、消費者がトピックからのメッセージの消費をランダムに停止し、スローされたエラーがないという問題が発生しています。これはテスト環境であるため、トピックには多くのメッセージがありません。
そのトピックの消費者のラグは > 0 になり、その数で (それを消費する消費者がいないように) 無期限にスタックします。kafka ツールを使用してトピックを調べたところ、パーティションがコンシューマーに割り当てられていることがわかります。
トピックを消費するコンシューマーの複数のスレッドを実行しています。コードは次のとおりです。
public Runnable getRunnable() {
return () -> {
final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate =
kafkaLib.createKafkaConsumerTemplate();
kafkaConsumerTemplate.receive()
.concatMap(record -> {
// process message and commit offset
})
.subscribe();
};
}
@Override
public void run(ApplicationArguments args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; i++) {
executorService.execute(getRunnable());
}
}
私の質問: トピックにプッシュされたメッセージが一定期間ない場合、 kafkaConsumperTemplate.receive() が自動的にサブスクリプションを停止/終了/キャンセルする可能性はありますか? それとも、サブスクリプションが終了した後、スレッドが何らかの形で終了するのでしょうか?
リアクティブなカフカ コンシューマを使用すると、注釈がなくなる@KafkaListener
ため、ApplicationRunner を使用してコンシューマのスレッドを生成します。反応的なカフカ消費者を開始する別の正しい方法はありますか?