1

以下は私のコンシューマーとプロデューサーの XML です。トピックでメッセージを正常に送信でき、コンシューマーから読み取ることができますが、コンソールでは以下のエラーが発生します。

10823 [taskExecutor-30] エラー org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessagingException: Kafka からの消費に失敗しました。ネストされた例外は java.util.concurrent.ExecutionException: java.lang.IllegalStateException: イテレーターが失敗した状態です

コンシューマ XML

<?xml version="1.0" encoding="UTF-8"?>
     <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
    http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">


        <int:channel id="inputFromKafka">
        <int:queue />
         </int:channel>

        <int:service-activator input-channel="inputFromKafka"
        ref="messageProcessor" method="reader" >
<int:poller fixed-rate="50" task-executor="taskExecutor" >
    </int:poller>
     </int:service-activator>

       <bean id="consumerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
    <props>
        <prop key="auto.offset.reset">largest</prop>
        <prop key="socket.receive.buffer.bytes">10485760</prop> 
        <prop key="fetch.message.max.bytes">5242880</prop>
        <prop key="auto.commit.interval.ms">100000</prop>
    </props>
</property>
      </bean>


          <int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="localhost:2181" zk-connection-timeout="6000"
    zk-session-timeout="6000" zk-sync-time="2000" />


      <bean id="kafkaThreadListener" class="com.maistylz.adminui.integration.ConsumerStarter"
init-method="initIt" />

       <int-kafka:inbound-channel-adapter
kafka-consumer-context-ref="consumerContext" auto-startup="true"
channel="inputFromKafka" id="kafka-inbound-channel-adapter" >
<int:poller fixed-delay="1000" time-unit="MILLISECONDS"
    receive-timeout="1000" />

    </int-kafka:inbound-channel-adapter>
    <task:executor id="taskExecutor" pool-size="50" keep-alive="120" queue-capacity="500"/>


          <int-kafka:consumer-context id="consumerContext"
consumer-timeout="400000" zookeeper-connect="zookeeperConnect"
consumer-properties="consumerProperties" >
<int-kafka:consumer-configurations>
    <int-kafka:consumer-configuration
        group-id="default" max-messages="500" executor="taskExecutor">
        <int-kafka:topic id="event-stream" streams="1" />
    </int-kafka:consumer-configuration>
   </int-kafka:consumer-configurations>
 </int-kafka:consumer-context>

   </beans>

プロデューサー XML

<?xml version="1.0" encoding="UTF-8"?>
           <beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
   xmlns:task="http://www.springframework.org/schema/task"
   xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

<int:channel id="inputToKafka">
    <int:queue/>
</int:channel>
          <bean id="producerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
    <props>
        <prop key="topic.metadata.refresh.interval.ms">360000000</prop>
        <prop key="message.send.max.retries">5</prop>
        <prop key="send.buffer.bytes">5242880</prop>
    </props>
</property>
         </bean>

               <bean id="encoder"
class="org.springframework.integration.kafka.serializer.common.StringEncoder" />
           <int-kafka:outbound-channel-adapter
        id="kafkaOutboundChannelAdapter" auto-startup="true"
        kafka-producer-context-ref="kafkaProducerContext" 
        channel="inputToKafka" >
    <int:poller fixed-delay="1000"  receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>

<task:executor id="taskExecutor" pool-size="5" keep-alive="1200" queue-capacity="10000"/>

<int-kafka:producer-context id="kafkaProducerContext">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="localhost:9092"
                                          topic="event-stream"
                                          key-class-type="java.lang.String"
                                          value-class-type="java.lang.String"
                                          value-encoder="encoder"
                                          key-encoder="encoder" 

                                          />
    </int-kafka:producer-configurations>
</int-kafka:producer-context>

                 </beans>
4

0 に答える 0