Spring-Kafka では、最初から Kafka トピックを再利用したいと考えています。group.id
を Kafka にとって未知のものに変更することでこれを行うと、もちろん機能します。
@KafkaListener(topics = "sensordata.t")
public void receiveMessage(String message) {
...
}
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "NewGroupID");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //it still commits though...
return props;
}
ただし、オフセットを 0 に設定してやり直すと失敗します。
@KafkaListener(topicPartitions =
{ @TopicPartition(topic = "sensordata.t",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
public void receiveMessage(String message) {
...
}
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
...
props.put(ConsumerConfig.GROUP_ID_CONFIG, "NewGroupID");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); //making timeout window larger seems to have no influence
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); //setting max records to 1 makes no difference
return props;
}
私が得るエラー:
2016-11-14 14:07:59.018 INFO 8165 --- [ main] c.i.t.s.server.SpringKafkaApplication : Started SpringKafkaApplication in 4.134 seconds (JVM running for 4.745)
2016-11-14 14:07:59.125 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator bto:9092 (id: 2147483647 rack: null) for group spring8.
2016-11-14 14:07:59.125 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator bto:9092 (id: 2147483647 rack: null) for group spring8.
2016-11-14 14:07:59.129 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group spring8
2016-11-14 14:07:59.129 INFO 8165 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2016-11-14 14:07:59.129 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group spring8
2016-11-14 14:07:59.338 ERROR 8165 --- [afka-consumer-1] essageListenerContainer$ListenerConsumer : Container exception
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059) ~[kafka-clients-0.10.0.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:939) ~[spring-kafka-1.1.1.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:816) ~[spring-kafka-1.1.1.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:526) ~[spring-kafka-1.1.1.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_92]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_92]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
これに詳しい人はいますか?
私は使用Kafka 0.10.1.0
しています
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
</properties>