<int:service-activator input-channel="toKafka" ref="conditionalProducerService" method="producerCircuitBreaker">
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice1" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel2" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
<int:poller fixed-delay="1000" error-channel="failedChannel2" />
</int-kafka:outbound-channel-adapter>
<int:chain input-channel="failedChannel2">
<int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
public Message<?> producerCircuitBreaker(Message<?> payload) {
throw new RuntimeException("foo Pro");}
上記の構成では、次のことを試みています。
1.失敗したメッセージが error-channel="failedChannel2" に伝播されることを期待していますが、これは発生していません。変換された出力がコンソールに表示されなかったためです。
2.CircuitBreaker は ServiceActivator (上記のアプリケーション関連の例外) に対して機能していますが、アウトバウンド アダプターの失敗した場合に CB を構成するにはどうすればよいですか。例:SIチャネルから外部(kafka)サーバーにメッセージを送信する前に、接続がタイムアウトしたか、サーバーが突然ダウンした場合/ネットワーク接続の問題/環境の問題。
Circuit Breaker Advice に関する SI doc に従って、以下を参照してください。
「通常、このアドバイスは外部サービスに使用される可能性があり、失敗するまでに時間がかかる場合があります (ネットワーク接続の試行のタイムアウトなど)」。
これを達成する方法を提案してください。ありがとうございます。
更新された構成:
<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />
<int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
<constructor-arg ref="producerContext"/>
</bean>
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice" />
</int:request-handler-advice-chain>
<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />
<int:transformer input-channel="errorChannel"
order="1" ref="transformerService1" method="transformFailed">
</int:transformer>
public void transformFailed(Message<?> message) {
APPLOGGER.log("transformer message test" + message);
public class ProducerMessageHandler extends KafkaProducerMessageHandler{
public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
super(kafkaProducerContext);
// TODO Auto-generated constructor stub
}
@Override
public void handleMessageInternal(final Message<?> message) throws Exception {
//super.handleMessageInternal(message);
throw new RuntimeException("test foo");
}
ログ:
01-05@23:44:18,598 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658598, id =e0591162-3b93-9bb6-0699-89b15b20e904}] DEBUG: - com.XXX.ProducerMessageHandler#0 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e90} ] 例外を取得しました: org.springframework.messaging.MessageHandlingException: メッセージ ハンドラでエラーが発生しました [com.XXX.ProducerMessageHandler#0]; ネストされた例外は java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - チャネル 'toKafka' で preSend、メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean $1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] DEBUG: - com.XXX.ProducerMessageHandler#0 受信メッセージ: GenericMessage [payload=hello , headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 例外が発生しました: org.springframework.messaging.MessageHandlingException: メッセージハンドラでエラーが発生しました [com.XXX.ProducerMessageHandler#0]; ネストされた例外は java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel です。PublishSubscribeChannel - チャネル「toKafka」で preSend、メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 01-05@23:44:18,606 DEBUG org.springframework .integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 得ました例外: org.springframework.messaging.MessageHandlingException: メッセージハンドラでエラーが発生しました [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6]; ネストされた例外は org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException: Circuit Breaker is Open for org.springframework です。integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 01-05@23:44:18,606 デバッグid=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 例外が発生しました: org.springframework.messaging.MessageHandlingException: メッセージハンドラでエラーが発生しました [org.springframework.integration.config. ServiceActivatorFactoryBean$1@6a0ef4b6]; ネストされた例外は org.springframework.integration です。