シンプルな RSocket と Spring Boot サーバーがあるとします。サーバーは、すべての着信クライアント メッセージを、接続されているすべてのクライアント (送信者を含む) にブロードキャストします。クライアントとサーバーは次のようになります。
サーバ:
public RSocketController() {
this.processor = DirectProcessor.<String>create().serialize();
this.sink = this.processor.sink();
}
@MessageMapping("channel")
Flux<String> channel(final Flux<String> messages) {
this.registerProducer(messages);
// breakpoint here
return processor
.doOnSubscribe(subscription -> logger.info("sub"))
.doOnNext(message -> logger.info("[Sent] " + message));
}
private Disposable registerProducer(Flux<String> flux) {
return flux
.doOnNext(message -> logger.info("[Received] " + message))
.map(String::toUpperCase)
// .delayElements(Duration.ofSeconds(1))
.subscribe(this.sink::next);
}
クライアント:
@ShellMethod("Connect to the server")
public void connect(String name) {
this.name = name;
this.rsocketRequester = rsocketRequesterBuilder
.rsocketStrategies(rsocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
@ShellMethod("Establish a channel")
public void channel() {
this.rsocketRequester
.route("channel")
.data(this.fluxProcessor.doOnNext(message -> logger.info("[Sent] {}", message)))
.retrieveFlux(String.class)
.subscribe(message -> logger.info("[Received] {}", message));
}
@ShellMethod("Send a lower case message")
public void send(String message) {
this.fluxSink.next(message.toLowerCase());
}
問題は、クライアントが送信する最初のメッセージがサーバーによって処理されますが、送信者に再び届かないことです。後続のメッセージはすべて問題なく配信されます。すでに接続されている他のすべてのクライアントは、すべてのメッセージを受信します。
デバッグ中にこれまでに気づいたこと
- クライアントで channel() を呼び出すと、retrieveFlux() と subscribe() が呼び出されます。ただし、サーバーでは、対応するメソッドでブレークポイントがトリガーされません。
- クライアントが send() を使用して最初のメッセージを送信した場合にのみ、サーバーでブレークポイントがトリガーされます。
- サーバーで .delayElements() を使用すると、問題が「解決」されるようです。
ここで何が間違っていますか?また、サーバーのブレークポイントをトリガーするために最初に send() が必要なのはなぜですか?
前もって感謝します!