1

spring-kafka 1.0.5 を使用して、同時実行数が 10 の 10 個のパーティションを持つ忙しいトピックから消費しています。

私の現在のコードは、HashMap に保持されているパーティション ID に基づいてメッセージをキューに追加します。

@KafkaListener(topics = "${kafka.topic}")
public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) {
    //Pseudo code
    add to Hashmap<Integer, Queue<ConsumerRecord>> based on partition.
}

残念ながら、そのデザインは、単純な消費の 2 倍の処理時間を要します。

私の要件は、パーティションを個別に処理することですが、@KafkaListener に基づくパーティションへの参照を含むハッシュマップを回避するにはどうすればよいですか。

これについてもっと効率的な方法はありますか?理想的には、リスナー アノテーションの各スレッドが独自のリストを管理します。パーティションIDに基づいて上記のハッシュマップなどの相互参照を持たずにそれを行う方法はありますか?

4

1 に答える 1

1

@KafkaListener必要なパーティションごとにいくつかのメソッドを宣言することを検討してください。この目的のためtopicPartitionsに、代わりに属性を使用する必要がありtopicsます。

/**
 * Used to add topic/partition information to a {@code KafkaListener}.
 *
 */
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface TopicPartition {
于 2016-12-13T18:41:21.883 に答える