2

org.springframework.web.reactive.socket.WebSocketMessageを消費し、Netty の を使用したペイロードの処理を含む、いくつかの作業を行うSI フローがありByteBufます。ある時点で、フローで例外が発生しました。

org.springframework.messaging.MessageHandlingException: メッセージハンドラでエラーが発生しました [_org.springframework.integration.errorLogger.handler]; ネストされた例外は io.netty.util.IllegalReferenceCountException: refCnt: 0 です
    org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184) で ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) で ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) で ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) で ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:93) で ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
...
原因: io.netty.util.IllegalReferenceCountException: refCnt: 0
    io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1417) で ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1356) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.ByteBufUtil.hashCode(ByteBufUtil.java:175) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.AbstractByteBuf.hashCode(AbstractByteBuf.java:1315) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at org.springframework.core.io.buffer.NettyDataBuffer.hashCode(NettyDataBuffer.java:288) ~[spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    org.springframework.web.reactive.socket.WebSocketMessage.hashCode(WebSocketMessage.java:134) で ~[spring-webflux-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    java.lang.Object.toString(Object.java:236) ~[?:1.8.0_161] で
    java.lang.String.valueOf(String.java:2994) ~[?:1.8.0_161] で
    java.lang.StringBuilder.append(StringBuilder.java:131) で ~[?:1.8.0_161]

その後、すべてのバイナリ Web ソケット メッセージの処理が次の例外で失敗します。

2018-11-26T10:38:29,133 エラー --- [-server-epoll-7] osihLoggingHandler (:) org.springframework.messaging.MessageDeliveryException: チャネル 'binaryWebSocketMessageChannel' にメッセージを送信できませんでした。ネストされた例外は java.lang.IllegalStateException: [binaryWebSocketMessageChannel] には、メッセージを受け入れるサブスクライバーがありません。 id=3e0be929、uri=http://localhost:8080/])、ヘッダー={id=b09a89ff-f7be-1b43-6f62-40e5c0b5695a、タイムスタンプ=1543225109132}]
    org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) で
    org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) で
    org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) で
    org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) で
    org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) で
    org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) で
    org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) で
    org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:183) で
    org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) で
    org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:205) で
    org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:55) で
    org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:138) で
    org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:127) で
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158) で
    reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:79) で
...
原因: java.lang.IllegalStateException: [binaryWebSocketMessageChannel] には、メッセージを受け入れるサブスクライバーがありません
    org.springframework.util.Assert.state(Assert.java:94) で
    org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63) で
    org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) で
    ... さらに 57 件

問題を解決するために、誰かが私を任意の方向に向けることができますか? また、SI EIP コンポーネント (ルーター、トランスフォーマー、フィルター、サービス アクティベーター) がチャネルからサブスクライブ解除するのはどのような場合ですか?

参考までに、チャンネルタイプはorg.springframework.integration.channel.FluxMessageChannel

編集:

私のフローは次のようになります。

WebSocketMessage -> router: (BINARY)  -> binaryWebSocketMessageChannel -> ...
                            (!BINARY) -> nullChannel

(フィルターがこちらの方が適していることはわかっています。後でリファクタリングする予定です)

@ArtemBilan 例のレポはこちら: https://github.com/ioreskovic/Spring-Integration-flow-loses-subscriber

4

1 に答える 1

2

ポイントは、その Spring Integration バージョンでキャンセルされていることですPublisherFluxMessageChannel

バージョンonErrorContinue()の Reactorから使い始めました。問題を解決するには、アプリケーションを最新の Spring Boot にアップグレードすることを検討することをお勧めします。3.25.12.1.1

私たちの回避策として、例外を飲み込みBinaryWsmToBytesTransformer、後ろに泡立てないようにすることを検討できFluxMessageChannelます。

于 2018-11-30T19:35:35.670 に答える