2

Kafka を 0.9.0 から 0.10.0 にアップグレードする際、異なるプロデューサーを異なるトピックに設定する際に問題に直面します。以下に示す XML 構成

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

	<int:publish-subscribe-channel id="inputToKafka" />

	<!-- Producer Config -->

	<int-kafka:outbound-channel-adapter
		id="fcmOutboundChannelAdapter" kafka-template="fcmNotificationTemplate" topic="trigger-fcm-notification"
		auto-startup="true" channel="inputToKafka">
		<int-kafka:request-handler-advice-chain>
			<bean
				class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
		</int-kafka:request-handler-advice-chain>
	</int-kafka:outbound-channel-adapter>
	
	<int-kafka:outbound-channel-adapter
		id="masOutboundChannelAdapter" kafka-template="microsoftAccountSyncTemplate" topic="sync-microsoft-account"
		auto-startup="true" channel="inputToKafka">
		<int-kafka:request-handler-advice-chain>
			<bean
				class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
		</int-kafka:request-handler-advice-chain>
	</int-kafka:outbound-channel-adapter>
	
	<bean id="fcmNotificationTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg>
			<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
				<constructor-arg>
					<map>
						<entry key="bootstrap.servers" value="localhost:9092" />
						<entry key="retries" value="0" />
						<entry key="batch.size" value="16384" />						
						<entry key="linger.ms" value="0" />
						<entry key="buffer.memory" value="33554432" />
						<entry key="key.serializer"
							value="org.apache.kafka.common.serialization.StringSerializer" />
						<entry key="value.serializer"
							value="common.serializer.FcmNotificationVoSerializer" />
					</map>
				</constructor-arg>
			</bean>
		</constructor-arg>
	</bean>

	<bean id="microsoftAccountSyncTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg>
			<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
				<constructor-arg>
					<map>
						<entry key="bootstrap.servers" value="localhost:9092" />
						<entry key="retries" value="0" />
						<entry key="batch.size" value="16384" />
						<entry key="buffer.memory" value="33554432" />	
						<entry key="linger.ms" value="0" />
						<entry key="key.serializer"
							value="org.apache.kafka.common.serialization.StringSerializer" />
						<entry key="value.serializer"
							value="common.serializer.MicrosoftAccountSyncRequestVoSerializer" />
					</map>
				</constructor-arg>
			</bean>
		</constructor-arg>
	</bean>

	<int-kafka:message-driven-channel-adapter
		id="kafka-message-channel-adapter-FCM" listener-container="fcmContainer"
		auto-startup="true" phase="100" send-timeout="5000"
		channel="ip-chanel-trigger-fcm-notification" mode="record"
		message-converter="messageConverter" />

	<int-kafka:message-driven-channel-adapter
		id="kafka-message-channel-adapter-SMA" listener-container="microsoftAccountSyncContainer"
		auto-startup="true" phase="100" send-timeout="5000"
		channel="ip-chanel-sync-microsoft-account" mode="record"
		message-converter="messageConverter" />
		
	<bean id="messageConverter"
		class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

	<!-- Consumer Config -->
	<int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
		ref="fcmNotificationConsumer">
	</int:service-activator>
	
	<int:service-activator input-channel="ip-chanel-sync-microsoft-account"
		ref="syncMicrosoftAccountConsumer">
	</int:service-activator>
	
	<bean id="fcmContainer"
		class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
		<constructor-arg>
			<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
				<constructor-arg>
					<map>
						<entry key="bootstrap.servers" value="localhost:9092" />
						<entry key="enable.auto.commit" value="true" />
						<entry key="auto.commit.interval.ms" value="100" />
						<entry key="session.timeout.ms" value="15000" />
						<entry key="group.id" value="trigger-fcm-notification" />
						<entry key="key.deserializer"
							value="org.apache.kafka.common.serialization.StringDeserializer" />
						<entry key="value.deserializer"
							value="common.deserializer.FcmNotificationVoDeserializer" />
					</map>
				</constructor-arg>
			</bean>
		</constructor-arg>
		<constructor-arg>
			<bean class="org.springframework.kafka.listener.config.ContainerProperties">
				<constructor-arg name="topics" value="trigger-fcm-notification" />
			</bean>
		</constructor-arg>
	</bean>
		
	<bean id="microsoftAccountSyncContainer"
		class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
		<constructor-arg>
			<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
				<constructor-arg>
					<map>
						<entry key="bootstrap.servers" value="localhost:9092" />
						<entry key="enable.auto.commit" value="true" />
						<entry key="auto.commit.interval.ms" value="100" />
						<entry key="session.timeout.ms" value="15000" />
						<entry key="group.id" value="sync-microsoft-account" />
						<entry key="key.deserializer"
							value="org.apache.kafka.common.serialization.StringDeserializer" />
						<entry key="value.deserializer"
							value="common.deserializer.MicrosoftAccountSyncRequestVoDeserializer" />
					</map>
				</constructor-arg>
			</bean>
		</constructor-arg>
		<constructor-arg>
			<bean class="org.springframework.kafka.listener.config.ContainerProperties">
				<constructor-arg name="topics" value="sync-microsoft-account" />
			</bean>
		</constructor-arg>
	</bean>

</beans>

2 つのトピックを個別に公開中にエラーが発生しました。スタックトレースは以下の通り

(java.lang.String,java.lang.String,java.lang.String,java.util.Locale,org.springframework.ui.Model,java.security.Principal)]: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer
2016-10-20 18:12:53,849 [http-nio-8080-exec-4] DEBUG org.springframework.web.servlet.DispatcherServlet - Could not complete request
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$1.execute(AbstractRequestHandlerAdvice.java:75)
	at org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice.doInvoke(RequestHandlerCircuitBreakerAdvice.java:62)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
	at com.sun.proxy.$Proxy52.handleMessage(Unknown Source)
	at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236)
	at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
	at service.impl.AdminOperationsServiceImpl.publishToQueue(AdminOperationsServiceImpl.java:1191)
	at service.impl.AdminOperationsServiceImpl.update(AdminOperationsServiceImpl.java:1366)
	at service.TenantDocumentsController.update(TenantDocumentsController.java:277)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)

2 つの個別の Serializer クラスと Deserializer クラスを定義しました。しかし、どのように内部的に他のクラスを参照しているのでしょうか? 構成を見逃していませんか?

4

1 に答える 1

1

Kafka に送信するため、 の件名はありませんDeserializer。StackTrace によると、GcmNotificationVoオブジェクトを に送信する REST サービスを実行しますinputToKafka

ここで、2 番目のサブスクライバーは、 を使用してそのオブジェクトの Kafka シリアル化を実行できませんcommon.serializer.MicrosoftAccountSyncRequestVoSerializer

たぶん、masOutboundChannelAdapter別の操作に使用するというあなたのアイデアはありますか?したがって、新しいセパレートchannel

于 2016-10-20T15:15:36.013 に答える