サーバーから送信されたデータがクライアント要求またはプッシュへの反応のいずれかになる rsocket チャネルを作成したいと考えています。そのためにフラックスマージを使用します。
これは参照データです。クライアントは更新を要求でき、サーバーは更新をプッシュすることもできます。
だから私はこれをサーバー側に持っています:
@MessageMapping("update-stream")
Flux<DomainObject> addUpdatesListener(Flux<RefreshRequest> requests) {
Flux<DomainObject> pushFlux = Flux.from(this.flux)
.doOnError((e) -> log.error("Error on push flux : {}", e, e));
return requests
.map(this::getUpdates)
.flatMap(Flux::fromIterable)
.doOnError((e) -> log.error("Error on channel flux : {}", e, e))
.mergeWith(pushFlux)
.doOnError((e) -> log.error("Error on merged flux : {}", e, e));
}
クライアントを停止すると、次のエラーが発生することを除いて機能します。
06-07-2020 15:58:53.168 [reactor-http-nio-3] ERROR reactor.core.publisher.Operators.error - Operator called default onErrorDropped
java.util.concurrent.CancellationException: Disposed
at reactor.core.publisher.FluxProcessor.dispose(FluxProcessor.java:80)
at io.rsocket.core.RSocketResponder$3.hookOnCancel(RSocketResponder.java:513)
at reactor.core.publisher.BaseSubscriber.cancel(BaseSubscriber.java:230)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at io.rsocket.core.RSocketResponder.cleanUpSendingSubscriptions(RSocketResponder.java:275)
at io.rsocket.core.RSocketResponder.cleanup(RSocketResponder.java:265)
at io.rsocket.core.RSocketResponder.tryTerminate(RSocketResponder.java:167)
at io.rsocket.core.RSocketResponder.tryTerminateOnConnectionClose(RSocketResponder.java:160)
at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:132)
at reactor.core.publisher.MonoProcessor$NextInner.onComplete(MonoProcessor.java:518)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:308)
at reactor.core.publisher.MonoProcessor.onComplete(MonoProcessor.java:265)
at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:23)
at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:60)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1158)
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:760)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
マージを行わなければ、エラーはありません。
私は多くの異なるバージョンを試しましたが、両方のプッシュとエラーなしでクライアントを終了させる方法を見つけることができません。
何が欠けていますか?
どうもありがとうございます。