3

spring-cloud-starter-stream-kafka を使用して、Spring Cloud Stream を使用しています。次のように、チャネルを kafka トピックにバインドしましたapplication.properties

spring.cloud.stream.bindings.gatewayOutput.destination=received
spring.cloud.stream.bindings.enrichingInput.destination=received
spring.cloud.stream.bindings.enrichingOutput.destination=enriched
spring.cloud.stream.bindings.redeemingInput.destination=enriched
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed
spring.cloud.stream.bindings.error.destination=errors12
spring.cloud.stream.bindings.errorInput.destination=errors12
spring.cloud.stream.bindings.errorOutput.destination=errors12

プログラムでエラー チャネルに例外メッセージを生成することができません。驚いたことに、私は別のスレッドにいるにもかかわらず、それを生成しようとさえしていないようです (@MessagingGatewayメッセージを にダンプする がgatewayOutputあり、残りのフローは非同期で行われます)。my の定義は次のServiceActivatorとおりです。

@Named
@Configuration
@EnableBinding(Channels.class)
@EnableIntegration
public class FulfillingServiceImpl extends AbstractBaseService implements
        FulfillingService {

    @Override
    @Audit(value = "annotatedEvent")
    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false")
    public void fulfill(TrivialRedemption redemption) throws Exception {

        logger.error("FULFILLED!!!!!!");

        throw new Exception("test exception");

    }
}

これが生成されたログです (完全な例外を切り捨てました)。ありません...

  • サブスクライバーがいない errorChannel に関する苦情
  • Kafka プロデューサー スレッドのロギング
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$RecomingHandler:115 - org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$RecomingHandler@2b461688 受信メッセージ: GenericMessage [payload=byte[ 400]、headers={kafka_offset=17、kafka_messageKey=null、kafka_topic=redeemed、kafka_partitionId=0、kafka_nextOffset=18}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - チャネル 'fulfillingInput' で preSend、メッセージ: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90 [エンドポイント=[com.test.system.poc.model.v3.Breadcrumb@21be7df8]、orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f、systemCategory=DEMO、systemSubCategory=、プロパティ=、monetaryRedemptionAmount=456.78]、ヘッダー= {kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - { }
2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - [org.springframework.integration.handler.MethodInvokingMessageProcessor@64bce7ab] の ServiceActivator (fulfillingServiceImpl.fulfill.serviceActivator.handler) がメッセージを受信しました: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f] ,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-Java-object;type =com.test.system.poc.model.v3.TrivialRedemption}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - シングルトン Bean 'integrationEvaluationContext' のキャッシュされたインスタンスを返します - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - シングルトン Bean 'integrationConversionService' のキャッシュされたインスタンスを返します - {}
2016-05-13 12:13:14 プール 6 スレッド 1 エラー FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - 完了!!!!!! - {}
2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - 処理中のエラー: KafkaMessage [メッセージ (マジック = 0、属性 = 0、crc = 3373691507、キー = null、ペイロード = Java. nio.HeapByteBuffer[pos=0 lim=400 cap=400])、KafkaMessageMetadata [オフセット=17、nextOffset=18、パーティション[トピック='redeemed'、id=0]] - {}
...
...
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 バイトが読み取られました。- {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - パーティションからの読み取り[topic='enriched', id=0]@18 - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 バイトが書き込まれました。- {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 バイトが読み取られました。- {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - パーティションからの読み取り[topic='redeemed', id=0]@18 - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 バイトが書き込まれました。- {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 バイトが読み取られました。- {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - パーティションからの読み取り[topic='errors12', id=0]@0 - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 バイトが書き込まれました。- {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 バイトが読み取られました。- {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 バイトが書き込まれました。- {}

編集:これが私のチャンネルクラスの内容です:

public interface Channels {

    public static final String GATEWAY_OUTPUT = "gatewayOutput";

    public static final String ENRICHING_INPUT = "enrichingInput";
    public static final String ENRICHING_OUTPUT = "enrichingOutput";

    public static final String REDEEMING_INPUT = "redeemingInput";
    public static final String REDEEMING_OUTPUT = "redeemingOutput";

    public static final String FULFILLING_INPUT = "fulfillingInput";
    public static final String FULFILLING_OUTPUT = "fulfillingOutput";

    @Output(GATEWAY_OUTPUT)
    MessageChannel gatewayOutput();

    @Input(ENRICHING_INPUT)
    MessageChannel enrichingInput();

    @Output(ENRICHING_OUTPUT)
    MessageChannel enrichingOutput();

    @Input(REDEEMING_INPUT)
    MessageChannel redeemingInput();

    @Output(REDEEMING_OUTPUT)
    MessageChannel redeemingOutput();

    @Input(FULFILLING_INPUT)
    MessageChannel fulfillingInput();

    @Output(FULFILLING_OUTPUT)
    MessageChannel fulfillingOutput();
4

1 に答える 1

1

クラスは表示Channelsされませんが、バインダーは「エラー」チャネルが「特別」であることを知りません。

バインダーは、再試行を使用して構成し、例外を配信不能トピックにルーティングすることができます。1.0.0.RELEASE にあるこの PRを参照してください。

または、サービス アクティベーターの前に「ミッドフロー」ゲートウェイを追加することもできます。Java の「try/catch」ブロックのように考えてください。

@MessageEndpoint
public static class GatewayInvoker {

    @Autowired
    private ErrorHandlingGateway gw;

    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT)
    public void send(Message<?> message) {
        this.gw.send(message);
    }

}

@Bean
public GatewayInvoker gate() {
    return new GatewayInvoker();
}

@MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS)
public interface ErrorHandlingGateway {

    void send(Message<?> message);

}

サービス アクティベーターの入力チャネルを に変更しますtoService

フレームワークがインターフェイスを検出し、そのプロキシを構築@IntegrationComponentScanできるように、構成クラスに追加する必要があります。@MessagingGateway

編集

私に提案されたもう 1 つの代替案ExpressionEvaluatingAdviceは、サービス アクティベーターのアドバイス チェーンに を追加することです。

于 2016-05-13T18:00:41.480 に答える