16

spring-kafka 2.1.x を使用して Spring Boot 2.0 アプリケーションでデッド レター キュー (DLQ)の概念を実装し、Bean の@KafkaListenerメソッドによって処理できなかったすべてのメッセージを事前定義された Kafka DLQ トピックに送信する最良の方法は何ですか?単一のメッセージを失うことはありませんか?

したがって、消費された Kafka レコードは次のいずれかです。

  1. 正常に処理され、
  2. 処理に失敗し、DLQ トピックに送信されます。
  3. 処理に失敗し、(予期しない問題により) DLQ トピックに送信されないため、リスナーによって再び消費されます。

KafkaTemplate を使用して DLQ トピックへの処理に失敗したレコードを送信するErrorHandlerのカスタム実装でリスナー コンテナーを作成しようとしました。無効化された自動コミットとRECORD AckMode の使用。

spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${dlqTopic}")
    private String dlqTopic;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        log.error("Error, sending to DLQ...");
        kafkaTemplate.send(dlqTopic, record.key(), record.value());
    }
}

この実装は項目#3を保証していないようです。DlqErrorHandler で例外がスローされた場合、レコードはリスナーによって再び消費されません。

トランザクション リスナー コンテナーの使用は役に立ちますか?

factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

Spring Kafka を使用して DLQ の概念を実装する便利な方法はありますか?

2018/03/28更新

Gary Russell の回答のおかげで、次のように DlqErrorHandler を実装することで、目的の動作を実現できました。

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    ...
    @Override
    public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        Consumerrecord<?, ? record = records.get(0);
        try {
            kafkaTemplate.send("dlqTopic", record.key, record.value());
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
        } catch (Exception e) {
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
            throw new KafkaException("Seek to current after exception", thrownException);
        }
    }
}

この方法では、コンシューマー ポーリングが 3 つのレコード (1、2、3) を返し、2 番目のレコードを処理できない場合:

  • 1が処理されます
  • 2 は処理に失敗し、DLQ に送信されます
  • 3 コンシューマーが record.offset() + 1 をシークしたおかげで、リスナーに配信されます

DLQ への送信が失敗した場合、消費者は record.offset() をシークし、レコードはリスナーに再配信されます (DLQ への送信はおそらく廃止されます)。

2021/04/30更新

Spring Kafka 2.7.0 以降、ノンブロッキングの再試行と配信不能トピックがネイティブでサポートされています。

例を参照してください: https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt

通常、再試行はノンブロッキング (別のトピックで行う) で、遅延させる必要があります。

  • リアルタイムのトラフィックを中断しないため。
  • 呼び出しの数を増幅させず、本質的に不正なリクエストをスパム送信します。
  • 可観測性のため (再試行回数とその他のメタデータを取得するため)。通常、Kafka でノンブロッキングの再試行と DLT 機能を実現するには、追加のトピックを設定し、対応するリスナーを作成して構成する必要があります。 Kafka ノンブロッキング再試行と DLT
4

1 に答える 1