1

spring- kafkaのドキュメントに基づいて、アノテーション ベースの @KafkaListener を使用してコンシューマーを構成しています。

私が見ているのは -

  1. 開始時にオフセットをゼロに指定しない限り、Kafka コンシューマーは既存のメッセージではなく将来のメッセージを取得します。(私が望むものへのオフセットを指定していないので、これは予想される結果であることを理解しています)

  2. ドキュメントには、トピックとパーティションの組み合わせを指定し、それに加えてゼロのオフセットを指定するオプションがありますが、これを行う場合は、消費者に聞いてもらいたいトピックを明示的に指定する必要があります。

上記のアプローチ 2 を使用すると、これが私の消費者の現在の外観です。

@KafkaListener(id = "{group.id}",
        topicPartitions = {
                @TopicPartition(topic = "${kafka.topic.name}",
                        partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
        },
        containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String payload,
                   Acknowledgment ack) throws InterruptedException, IOException {

    logger.debug("This is what we received in the Kafka Consumer = " + payload);

    idService.process(payload);

    ack.acknowledge();
}

「topicPattern」ワイルドカードまたは「トピック」リストをアノテーション構成の一部として指定するオプションがあることは理解していますが、ゼロから開始するオフセット値を指定できる場所がわかりません。トピック/トピックパターンがリストされています。両方を組み合わせる方法はありますか?お知らせ下さい。

4

1 に答える 1

7

(パーティションを明示的に宣言するのではなく) トピックと topicPatterns を使用する場合、Kafka はどのコンシューマー インスタンスがどのパーティションを取得するかを決定します。

Kafka はパーティションを割り当て、初期オフセットはそのグループ ID に対して最後にコミットされたものになります。現在そのオフセットを変更することはできませんが、シーク機能の追加を検討しています。

常に最初に利用可能なオフセットから開始したい場合は、一意のグループ ID (例: UUID.randomUUID().toString()) を使用して設定します

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Kafka にはそのグループ ID の既存のオフセットがないため、そのプロパティを使用してどこから開始するかを決定します。

MANUAL ack モードを使用して決して ack しないこともできます。これは実質的に同じことを行います。

于 2016-08-23T14:28:14.700 に答える