1

Spring Kafka ドキュメントのバッチ パターンを使用して、Spring Boot アプリで Kafka コンシューマーをセットアップしました。トピックに負荷をかけるまでは機能しているように見えました。その後、オフセットは更新されず、コンシューマー メッセージの同じバッチが何度も処理されます。

デフォルトでは、バッチ サイズは、フレームワークのアルゴリズムに基づいて動的に表示されます。そのため、トピックがますます遅れるにつれて、メッセージのバッチ処理は数百から数千になりました。バッチ サイズ (max.poll.records) を 25 に設定することで問題を回避できましたが、根本的な原因を理解するまでは Kafka/Spring-Kafka を信頼していません。

バッチが完了するまでに 3000 ミリ秒から 15000 ミリ秒かかります。

ファクトリのプロパティ:

    public Map<String, Object> getKafkaConfigurationProperties() {
    ImmutableMap.Builder<String, Object> builderMap = new ImmutableMap.Builder<String, Object>()
            .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(',').join(getBootstrapServers()))
            .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, getKeyDeserializer())
            .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100)
            .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
            .put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000)
            .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer())
            .put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 25);

    String groupId = getConsumerGroup();
    if (StringUtils.isNotBlank(groupId)) {
        builderMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    }

    return builderMap.build();

}

工場ビルド:

private KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> makeKafkaListener(
        ConsumerFactory<String, String> consumerFactory, KafkaConsumerConfiguration configuration) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(6);
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    return factory;
}

リスナー サービスは、次の注釈を使用します。

@KafkaListener(
        containerFactory = "serviceListenerContainerFactory",
        group = "kafkaListenerContainers",
        id = "widgetListener",
        topics = "widget.topic.prod")
public void listenProduct(List<ConsumerRecord<String, String>> consumerRecordList) { ... stuff ... }
4

0 に答える 0