2

Reactor Netty では、 経由out.send(publisher)で TCP チャネルにデータを送信する場合、任意のパブリッシャーが動作することが期待されます。ただし、単純な即時型の代わりに、Flux遅延要素を含むより複雑な即時型を使用すると、正しく機能しなくなります。たとえば、この hello world TCP エコー サーバーを使用すると、期待どおりに動作します。

import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.time.Duration;

public class Reactor1 {
    public static void main(String[] args) throws Exception {
        DisposableServer server = TcpServer.create()
            .port(3344)
            .handle((in, out) -> in
                .receive()
                .asString()
                .flatMap(s ->
                    out.sendString(Flux.just(s.toUpperCase()))
                ))
            .bind()
            .block();
        server.channel().closeFuture().sync();
    }
}

ただし、次のように変更out.sendStringすると、

out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))

次に、受信したアイテムごとに、1 秒の遅延で出力が生成されることが期待されます。

ただし、サーバーが動作する方法は、間隔中に複数のアイテムを受信した場合、最初のアイテムに対してのみ出力を生成するというものです。たとえば、以下では最初の 1 秒間にaaandを入力しますが、(1 秒後) 出力としてのみ生成されます。bbAA

$ nc localhost 3344
aa
bb
AA <after one second>

次に、後で追加の行を入力すると、(1 秒後に) 出力が得られますが、前の入力からのものです。

cc
BB <after one second>

send()遅延で期待どおりに動作させる方法はありFluxますか?

4

2 に答える 2