以下は私のコンシューマーとプロデューサーの XML です。トピックでメッセージを正常に送信でき、コンシューマーから読み取ることができますが、コンソールでは以下のエラーが発生します。
10823 [taskExecutor-30] エラー org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessagingException: Kafka からの消費に失敗しました。ネストされた例外は java.util.concurrent.ExecutionException: java.lang.IllegalStateException: イテレーターが失敗した状態です
コンシューマ XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputFromKafka">
<int:queue />
</int:channel>
<int:service-activator input-channel="inputFromKafka"
ref="messageProcessor" method="reader" >
<int:poller fixed-rate="50" task-executor="taskExecutor" >
</int:poller>
</int:service-activator>
<bean id="consumerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="auto.offset.reset">largest</prop>
<prop key="socket.receive.buffer.bytes">10485760</prop>
<prop key="fetch.message.max.bytes">5242880</prop>
<prop key="auto.commit.interval.ms">100000</prop>
</props>
</property>
</bean>
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="localhost:2181" zk-connection-timeout="6000"
zk-session-timeout="6000" zk-sync-time="2000" />
<bean id="kafkaThreadListener" class="com.maistylz.adminui.integration.ConsumerStarter"
init-method="initIt" />
<int-kafka:inbound-channel-adapter
kafka-consumer-context-ref="consumerContext" auto-startup="true"
channel="inputFromKafka" id="kafka-inbound-channel-adapter" >
<int:poller fixed-delay="1000" time-unit="MILLISECONDS"
receive-timeout="1000" />
</int-kafka:inbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="50" keep-alive="120" queue-capacity="500"/>
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="400000" zookeeper-connect="zookeeperConnect"
consumer-properties="consumerProperties" >
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="default" max-messages="500" executor="taskExecutor">
<int-kafka:topic id="event-stream" streams="1" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
</beans>
プロデューサー XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputToKafka">
<int:queue/>
</int:channel>
<bean id="producerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="topic.metadata.refresh.interval.ms">360000000</prop>
<prop key="message.send.max.retries">5</prop>
<prop key="send.buffer.bytes">5242880</prop>
</props>
</property>
</bean>
<bean id="encoder"
class="org.springframework.integration.kafka.serializer.common.StringEncoder" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" auto-startup="true"
kafka-producer-context-ref="kafkaProducerContext"
channel="inputToKafka" >
<int:poller fixed-delay="1000" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="5" keep-alive="1200" queue-capacity="10000"/>
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="event-stream"
key-class-type="java.lang.String"
value-class-type="java.lang.String"
value-encoder="encoder"
key-encoder="encoder"
/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
</beans>