2

spring-integration-kafka を 1.0.0.M2 から 2.1.0.RELEASE に、クライアントを 0.9.0 から 0.10.0 にアップグレードしています。

以下のような現在のxml構成

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

	<int:publish-subscribe-channel id="inputToKafka" />

	<int-kafka:outbound-channel-adapter
		kafka-producer-context-ref="kafkaProducerContext" auto-startup="true"
		channel="inputToKafka" order="1">
	</int-kafka:outbound-channel-adapter>

	<bean id="producerProperties"
		class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="properties">
			<props>
				<prop key="topic.metadata.refresh.interval.ms">${topic.metadata.refresh.interval.ms}</prop>
				<prop key="message.send.max.retries">${message.send.max.retries}</prop>
				<prop key="send.buffer.bytes">${send.buffer.bytes}</prop>
			</props>
		</property>
	</bean>
	
	<bean id="fcmNotificationEncoder"
		class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
		<constructor-arg value="common.vo.NotificationVo" />
	</bean>

	<int-kafka:producer-context id="kafkaProducerContext"
		producer-properties="producerProperties">
		<int-kafka:producer-configurations>
			<int-kafka:producer-configuration
				broker-list="${kafka.servers}" key-class-type="java.lang.String"
				value-class-type="common.vo.fcmNotificationVo"
				value-encoder="fcmNotificationEncoder" topic="trigger-fcm-notification"
				compression-codec="none" />
		</int-kafka:producer-configurations>
	</int-kafka:producer-context>

	<int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
		ref="fcmNotificationConsumer">
	</int:service-activator>

	<bean id="consumerProperties"
		class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="properties">
			<props>
				<prop key="auto.offset.reset">${auto.offset.reset}</prop>
				<prop key="socket.receive.buffer.bytes">${socket.receive.buffer.bytes}</prop> <!-- 10M -->
				<prop key="fetch.message.max.bytes">${fetch.message.max.bytes}</prop>
				<prop key="auto.commit.interval.ms">${auto.commit.interval.ms}</prop>
			</props>
		</property>
	</bean>


	<int-kafka:zookeeper-connect id="zookeeperConnect"
		zk-connect="${zookeeper.servers}" zk-connection-timeout="${zookeeper.connection.timeout}"
		zk-session-timeout="${zookeeper.session.timeout}" zk-sync-time="${zookeeper.sync.time}" />


	<bean id="kafkaThreadListener" class="api.utils.KafkaConsumerStarter"
		init-method="initIt" destroy-method="cleanUp" />

	<int-kafka:inbound-channel-adapter
		kafka-consumer-context-ref="consumerContextFCM" auto-startup="false"
		channel="ip-chanel-trigger-fcm-notification" id="kafka-inbound-channel-adapter-FCM">
		<int:poller fixed-delay="1000" time-unit="MILLISECONDS"
			receive-timeout="0" />
	</int-kafka:inbound-channel-adapter>


	<!-- Consumer -->

	<bean id="fcmNotificationDecoder"
		class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
		<constructor-arg value="common.vo.NotificationVo" />
	</bean>

	<int-kafka:consumer-context id="consumerContextFCM"
		consumer-timeout="4000" zookeeper-connect="zookeeperConnect"
		consumer-properties="consumerProperties">
		<int-kafka:consumer-configurations>
			<int-kafka:consumer-configuration
				group-id="trigger-fcm-notification" max-messages="50"
				value-decoder="fcmNotificationDecoder">
				<int-kafka:topic id="trigger-fcm-notification"
					streams="10" />
			</int-kafka:consumer-configuration>
		</int-kafka:consumer-configurations>
	</int-kafka:consumer-context>

</beans>

これを 2.1.0.RELEASE に変更するにはどうすればよいですか?

~~~~~~~~~~~~~~~

ここで編集:

参照を使用して、私の要件に従ってxmlを変更しました。Consumer Record の読み取り中に小さな問題が発生しました。次のようにペイロードを取得しました

{
kafka_offset=7, 
kafka_receivedMessageKey=null, 
kafka_receivedPartitionId=0, 
kafka_receivedTopic=trigger-fcm-notification, 
kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 7, CreateTime = 1476864644264, checksum = 3680317883, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@203c8c95)
}

コンシューマーでさらに使用するには、値 (NotificationVo) が必要です。ペイロードの一部として取得する方法は?

~~~~~~~~~~~~~~~

ここで編集:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

	<int:publish-subscribe-channel id="inputToKafka" />

	<int-kafka:outbound-channel-adapter
		id="kafkaOutboundChannelAdapter" kafka-template="template"
		auto-startup="true" channel="inputToKafka" topic="trigger-fcm-notification"
		order="1">

		<int-kafka:request-handler-advice-chain>
			<bean
				class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
		</int-kafka:request-handler-advice-chain>
	</int-kafka:outbound-channel-adapter>

	<!--Producer-->
	<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg>
			<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
				<constructor-arg>
					<map>
						<entry key="bootstrap.servers" value="localhost:9092" />
						<entry key="retries" value="5" />
						<entry key="batch.size" value="16384" />
						<entry key="linger.ms" value="1" />
						<entry key="buffer.memory" value="33554432" />
						<entry key="key.serializer"
							value="org.apache.kafka.common.serialization.StringSerializer" />
						<entry key="value.serializer"
							value="common.vo.NotificationVoSerializer" />
					</map>
				</constructor-arg>
			</bean>
		</constructor-arg>
	</bean>


	<int-kafka:message-driven-channel-adapter
		id="kafka-inbound-channel-adapter-FCM" listener-container="container1" 
		auto-startup="true" phase="100" send-timeout="5000" 
		channel="ip-chanel-trigger-fcm-notification" mode="record"
		message-converter="messageConverter" />

	<bean id="messageConverter"
		class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

	<int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
		ref="fcmNotificationConsumer">
	</int:service-activator>

	<!--Consumer-->
	<bean id="container1"
		class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
		<constructor-arg>
			<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
				<constructor-arg>
					<map>
						<entry key="bootstrap.servers" value="localhost:9092" />
						<entry key="enable.auto.commit" value="false" />
						<entry key="auto.commit.interval.ms" value="100" />
						<entry key="session.timeout.ms" value="15000" />
						<entry key="group.id" value="trigger-fcm-notification" />
						<entry key="key.deserializer"
							value="org.apache.kafka.common.serialization.StringDeserializer" />
						<entry key="value.deserializer"
							value="common.vo.NotificationVoDeserializer" />
					</map>
				</constructor-arg>
			</bean>
		</constructor-arg>

		<constructor-arg>
			<bean class="org.springframework.kafka.listener.config.ContainerProperties">
				<constructor-arg name="topics" value="trigger-fcm-notification" />
			</bean>
		</constructor-arg>
	</bean>

</beans>

これは変更された xml 構成ファイルです

~~~~~~~~~~~~~~~

ここで編集:

消費者クラス:

package common.notification.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import common.vo.NotificationVo;

@Component
public class FcmNotificationConsumer {

	@SuppressWarnings("unchecked")
	@ServiceActivator
	public <K, V> void process(Map<K, V> payload) {

		String topic = null;
		System.out.println("payload=====>"+payload.toString());

		for (K item : payload.keySet()) {
			topic = (String) item;
		}

		Object ackObject = payload.get(topic);
		System.out.println("ackObject=====>"+payload.get(topic));

	}
}

O/P:

payload=====>{kafka_offset=21, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=trigger-fcm-notification, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 21, CreateTime = 1476887227554, checksum = 222603853, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@29206bb8)}

ackObject=====>Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 21, CreateTime = 1476887227554, checksum = 222603853, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@29206bb8)

~~~~~~~~~~~~~~~

ここで編集:

Consumer クラスのメソッド パラメータを変更した後、予期されたペイロードを受け取りました。

package common.notification.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import common.vo.NotificationVo;

@Component
public class FcmNotificationConsumer {

	@SuppressWarnings("unchecked")
	@ServiceActivator
	public void process(Message<?> message) {
       	System.out.println("Message=====>"+message);
      	Object payloadObject = message.getPayload();
	 	NotificationVo notificationVo = (NotificationVo) payloadObject;
	}
}

O/P:

Message=====>GenericMessage [payload=common.vo.NotificationVo@4c144e99, headers={kafka_offset=16, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=trigger-fcm-notification, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 16, CreateTime = 1476949945607, checksum = 2501578118, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@4c144e99)}]

最後に期待どおりに動作します。

ご支援ありがとうございます。

4

1 に答える 1

2

わかった。以前よりきれいになりました。

したがって、現在のコードは Apache Kafka 0.8 用です。バージョン 0.9 から完全に異なるデザインになりました。したがって、Spring Integration Kafka 1.0 の現在のコードは破棄する必要があります。

Apache Kafka 0.10 について読む必要があります: https://kafka.apache.org/documentation

Kafka Client 0.10 に基づく Spring Kafka について: http://docs.spring.io/spring-kafka/docs/1.1.1.RELEASE/reference/html/

そして、Spring Integration Kafka に関する章に注意してください: http://docs.spring.io/spring-kafka/docs/1.1.1.RELEASE/reference/html/_spring_integration.html

そして注意:ここでは誰もあなたのために仕事をするつもりはありません.

編集

なぜあなたがそのpayloadようなものを持っていると言うのかわかりませんheaders。の はメッセージvalueConsumerRecordに変換されpayloadます。

それを受け取ったコードを共有し、value. は、それらと変換された から<int-kafka:message-driver-channel-adapter>生成されます。Message<>headerspayloadConsumerRecord value

于 2016-10-18T14:25:22.923 に答える