1

シンプルな RSocket と Spring Boot サーバーがあるとします。サーバーは、すべての着信クライアント メッセージを、接続されているすべてのクライアント (送信者を含む) にブロードキャストします。クライアントとサーバーは次のようになります。

サーバ:

  public RSocketController() {
    this.processor = DirectProcessor.<String>create().serialize();
    this.sink = this.processor.sink();
  }

  @MessageMapping("channel")
  Flux<String> channel(final Flux<String> messages) {
    this.registerProducer(messages);
    // breakpoint here
    return processor
        .doOnSubscribe(subscription -> logger.info("sub"))
        .doOnNext(message -> logger.info("[Sent] " + message));
  }

  private Disposable registerProducer(Flux<String> flux) {
    return flux
        .doOnNext(message -> logger.info("[Received] " + message))
        .map(String::toUpperCase)
        // .delayElements(Duration.ofSeconds(1))
        .subscribe(this.sink::next);
  }

クライアント:

  @ShellMethod("Connect to the server")
  public void connect(String name) {
    this.name = name;
    this.rsocketRequester = rsocketRequesterBuilder
        .rsocketStrategies(rsocketStrategies)
        .connectTcp("localhost", 7000)
        .block();
  }

  @ShellMethod("Establish a channel")
  public void channel() {
    this.rsocketRequester
        .route("channel")
        .data(this.fluxProcessor.doOnNext(message -> logger.info("[Sent] {}", message)))
        .retrieveFlux(String.class)
        .subscribe(message -> logger.info("[Received] {}", message));
  }

  @ShellMethod("Send a lower case message")
  public void send(String message) {
    this.fluxSink.next(message.toLowerCase());
  }

問題は、クライアントが送信する最初のメッセージがサーバーによって処理されますが、送信者に再び届かないことです。後続のメッセージはすべて問題なく配信されます。すでに接続されている他のすべてのクライアントは、すべてのメッセージを受信します。

デバッグ中にこれまでに気づいたこと

  • クライアントで channel() を呼び出すと、retrieveFlux() と subscribe() が呼び出されます。ただし、サーバーでは、対応するメソッドでブレークポイントがトリガーされません。
  • クライアントが send() を使用して最初のメッセージを送信した場合にのみ、サーバーでブレークポイントがトリガーされます。
  • サーバーで .delayElements() を使用すると、問題が「解決」されるようです。

ここで何が間違っていますか?また、サーバーのブレークポイントをトリガーするために最初に send() が必要なのはなぜですか?

前もって感謝します!

4

1 に答える 1