1
<int:service-activator input-channel="toKafka"  ref="conditionalProducerService" method="producerCircuitBreaker">

  <int:request-handler-advice-chain>
       <ref bean="circuitBreakerAdvice1" />
   </int:request-handler-advice-chain>
            </int:service-activator>

  <int:channel id="failedChannel2" />
  <int-kafka:outbound-channel-adapter
                            id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
                            <int:poller fixed-delay="1000" error-channel="failedChannel2" />
            </int-kafka:outbound-channel-adapter>


      <int:chain input-channel="failedChannel2">
        <int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
                            <int-stream:stderr-channel-adapter append-newline="true"/>
            </int:chain>

            <bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
                            <property name="threshold" value="2" />                                         
                            <property name="halfOpenAfter" value="12000" />                     
            </bean>

  public Message<?> producerCircuitBreaker(Message<?> payload) {
          throw  new RuntimeException("foo Pro");}

上記の構成では、次のことを試みています。

1.失敗したメッセージが error-channel="failedChannel2" に伝播されることを期待していますが、これは発生していません。変換された出力がコンソールに表示されなかったためです。

2.CircuitBreaker は ServiceActivator (上記のアプリケーション関連の例外) に対して機能していますが、アウトバウンド アダプターの失敗した場合に CB を構成するにはどうすればよいですか。例:SIチャネルから外部(kafka)サーバーにメッセージを送信する前に、接続がタイムアウトしたか、サーバーが突然ダウンした場合/ネットワーク接続の問題/環境の問題。

Circuit Breaker Advice に関する SI doc に従って、以下を参照してください。

「通常、このアドバイスは外部サービスに使用される可能性があり、失敗するまでに時間がかかる場合があります (ネットワーク接続の試行のタイムアウトなど)」。

これを達成する方法を提案してください。ありがとうございます。

更新された構成:

        <int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />

 <int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
 <constructor-arg ref="producerContext"/>
</bean>
     <int:request-handler-advice-chain>
                                     <ref bean="circuitBreakerAdvice" />
                       </int:request-handler-advice-chain>

<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />

 <int:transformer input-channel="errorChannel"
                              order="1" ref="transformerService1" method="transformFailed">

                       </int:transformer>  

 public void transformFailed(Message<?> message) {
          APPLOGGER.log("transformer message test" + message);


 public class ProducerMessageHandler extends KafkaProducerMessageHandler{

            public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
                            super(kafkaProducerContext);
                            // TODO Auto-generated constructor stub
            }

            @Override
            public void handleMessageInternal(final Message<?> message) throws Exception {

                            //super.handleMessageInternal(message);
                            throw new RuntimeException("test foo");
            }

ログ:

01-05@23:44:18,598 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658598, id =e0591162-3b93-9bb6-0699-89b15b20e904}] DEBUG: - com.XXX.ProducerMessageHandler#0 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e90} ] 例外を取得しました: org.springframework.messaging.MessageHandlingException: メッセージ ハンドラでエラーが発生しました [com.XXX.ProducerMessageHandler#0]; ネストされた例外は java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - チャネル 'toKafka' で preSend、メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean $1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] DEBUG: - com.XXX.ProducerMessageHandler#0 受信メッセージ: GenericMessage [payload=hello , headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 例外が発生しました: org.springframework.messaging.MessageHandlingException: メッセージハンドラでエラーが発生しました [com.XXX.ProducerMessageHandler#0]; ネストされた例外は java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel です。PublishSubscribeChannel - チャネル「toKafka」で preSend、メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 01-05@23:44:18,606 DEBUG org.springframework .integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 得ました例外: org.springframework.messaging.MessageHandlingException: メッセージハンドラでエラーが発生しました [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6]; ネストされた例外は org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException: Circuit Breaker is Open for org.springframework です。integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 01-05@23:44:18,606 デバッグid=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 受信メッセージ: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 例外が発生しました: org.springframework.messaging.MessageHandlingException: メッセージハンドラでエラーが発生しました [org.springframework.integration.config. ServiceActivatorFactoryBean$1@6a0ef4b6]; ネストされた例外は org.springframework.integration です。

4

2 に答える 2

1

アドバイスは、ダウンストリーム フローではなく、割り当てられているエンドポイントにのみ適用されます。残念ながら、kafka スキーマでは、送信チャネル アダプターへの適用が許可されていません。そのためのJIRA 課題を作成しました。

KafkaProducerMessageHandlerをとして設定し<bean/>refからを設定することで回避できます<service-activator/>。次に、サーキットブレーカーを適用できます。

別の回避策は、インフローゲートウェイを使用することです...

<int:service-activator ... ref="gw">
    <int:request-handler-advice-chain ...

</int:service-activator>

<int:gateway id="gw" default-request-channel="toKafka" 
         default-reply-timeout="0"
         error-channel="..." ... />

エラー チャネルにメッセージが表示されない理由がわかりません。通常、DEBUG ロギングをオンにすると、この種のデバッグに役立ちます。

編集

これでテストしたところ、問題なく動作します...

<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
    default-reply-timeout="0" />

<int:service-activator input-channel="toKafka">
    <bean class="com.example.Foo" />
    <int:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2"/>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

EDIT2

ゲートウェイを使用していない場合は、キュー チャネルとポーラーで処理できます。これも私にとってはうまくいきます...

<int:channel id="toKafka">
    <int:queue />
</int:channel>

<int:service-activator input-channel="toKafka">
    <bean class="com.example.Foo" />
    <int:poller error-channel="errorChannel" fixed-delay="1000" />
    <int:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2"/>
            <property name="halfOpenAfter" value="12000"/>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

または、ミッドフロー ゲートウェイを追加できます。

于 2015-12-19T19:29:07.383 に答える