0

Spring-Kafka では、最初から Kafka トピックを再利用したいと考えています。group.idを Kafka にとって未知のものに変更することでこれを行うと、もちろん機能します。

@KafkaListener(topics = "sensordata.t")
public void receiveMessage(String message) {
...
}

@Bean
public Map consumerConfigs() {
    Map props = new HashMap<>();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "NewGroupID");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //it still commits though...
    return props;
}

ただし、オフセットを 0 に設定してやり直すと失敗します。

@KafkaListener(topicPartitions =
{ @TopicPartition(topic = "sensordata.t",
        partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
public void receiveMessage(String message) {
...
}

@Bean
public Map consumerConfigs() {
    Map props = new HashMap<>();
    ...
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "NewGroupID");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); //making timeout window larger seems to have no influence
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); //setting max records to 1 makes no difference
    return props;
}

私が得るエラー:

2016-11-14 14:07:59.018  INFO 8165 --- [           main] c.i.t.s.server.SpringKafkaApplication    : Started SpringKafkaApplication in 4.134 seconds (JVM running for 4.745)
2016-11-14 14:07:59.125  INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator bto:9092 (id: 2147483647 rack: null) for group spring8.
2016-11-14 14:07:59.125  INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator bto:9092 (id: 2147483647 rack: null) for group spring8.
2016-11-14 14:07:59.129  INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group spring8
2016-11-14 14:07:59.129  INFO 8165 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-11-14 14:07:59.129  INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group spring8
2016-11-14 14:07:59.338 ERROR 8165 --- [afka-consumer-1] essageListenerContainer$ListenerConsumer : Container exception

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059) ~[kafka-clients-0.10.0.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:939) ~[spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:816) ~[spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:526) ~[spring-kafka-1.1.1.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_92]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_92]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92] 

これに詳しい人はいますか?

私は使用Kafka 0.10.1.0しています

<properties>
    <java.version>1.8</java.version>
    <spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
</properties>
4

1 に答える 1

0

なぜオフセットの問題だと判断したの0ですか?あなたの StackTrace は、あなたが持っているよりも長いと言いpollTimeoutますsession.timeout.ms:

グループはすでに再調整され、パーティションが別のメンバーに割り当てられているため、コミットを完了できません。これは、poll() への後続の呼び出し間の時間が、構成された session.timeout.ms よりも長かったことを意味します。これは通常、ポーリング ループがメッセージ処理に多くの時間を費やしていることを意味します。セッション タイムアウトを増やすか、max.poll.records を使用して poll() で返されるバッチの最大サイズを減らすことで、これに対処できます。

適当に調整してはどうですか?

于 2016-11-14T14:39:49.220 に答える