4

テスト 1 とテスト 2 の 2 つのトピックを読む必要がある spring-kafka アプリを作成しています。

public class Receiver {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(Receiver.class);

    @KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "test1", partitions = { "0" }),
  @TopicPartition(topic = "test2", partitions = { "0" })})
    public void receiveMessage(String message) {
        LOGGER.info("received message='{}'", message);
    }
}

私の設定は次のようになります:

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections
        // to the Kakfa cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        // consumer groups allow a pool of processes to divide the work of
        // consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");

        return props;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
}

「test2」の最初からすべてのメッセージを読み取ることができる一方で、「test1」からは最新のメッセージのみを読み取ることができる必要があります。アプリの起動時に「test2」メッセージのみに関心がありますが、「test1」メッセージは、アプリが実行されている限り継続的に読み取る必要があります。

そのような機能を構成する方法はありますか?

4

2 に答える 2

7

これが私のために働いた方法です:

@KafkaListener(id = "receiver-api",         
        topicPartitions =
        { @TopicPartition(topic = "schema.topic", 
                partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")),
                @TopicPartition(topic = "data.topic", partitions = { "0" })})
    public void receiveMessage(String message) {
        try {
                JSONObject incomingJsonObject = new JSONObject(message); 
                if(!incomingJsonObject.isNull("data")){
                    handleSchemaMessage(incomingJsonObject);
                }

                else {
                    handleDataMessage(incomingJsonObject);
                }

        } catch (Exception e) {
            e.printStackTrace();
        }

「partitionOffsets」アノテーションの使用(import org.springframework.kafka.annotation.PartitionOffset;)

いつものように他のトピックを「尾行」しながら、常に特定のトピックを最初から読むことができるようにするための鍵でした.

于 2016-11-01T11:51:10.603 に答える