0

しばらくすると、消費者がトピックからのメッセージの消費をランダムに停止し、スローされたエラーがないという問題が発生しています。これはテスト環境であるため、トピックには多くのメッセージがありません。

そのトピックの消費者のラグは > 0 になり、その数で (それを消費する消費者がいないように) 無期限にスタックします。kafka ツールを使用してトピックを調べたところ、パーティションがコンシューマーに割り当てられていることがわかります。

トピックを消費するコンシューマーの複数のスレッドを実行しています。コードは次のとおりです。

  public Runnable getRunnable() {
    return () -> {
      final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = 
          kafkaLib.createKafkaConsumerTemplate();
      kafkaConsumerTemplate.receive()
          .concatMap(record -> {
            // process message and commit offset

          })
          .subscribe();
    };
  }

  @Override
  public void run(ApplicationArguments args) {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    for (int i = 0; i < 2; i++) {
      executorService.execute(getRunnable());
    }

}

私の質問: トピックにプッシュされたメッセージが一定期間ない場合、 kafkaConsumperTemplate.receive() が自動的にサブスクリプションを停止/終了/キャンセルする可能性はありますか? それとも、サブスクリプションが終了した後、スレッドが何らかの形で終了するのでしょうか?

リアクティブなカフカ コンシューマを使用すると、注釈がなくなる@KafkaListenerため、ApplicationRunner を使用してコンシューマのスレッドを生成します。反応的なカフカ消費者を開始する別の正しい方法はありますか?

4

0 に答える 0