Reactor Netty では、 経由out.send(publisher)
で TCP チャネルにデータを送信する場合、任意のパブリッシャーが動作することが期待されます。ただし、単純な即時型の代わりに、Flux
遅延要素を含むより複雑な即時型を使用すると、正しく機能しなくなります。たとえば、この hello world TCP エコー サーバーを使用すると、期待どおりに動作します。
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.time.Duration;
public class Reactor1 {
public static void main(String[] args) throws Exception {
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();
}
}
ただし、次のように変更out.sendString
すると、
out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))
次に、受信したアイテムごとに、1 秒の遅延で出力が生成されることが期待されます。
ただし、サーバーが動作する方法は、間隔中に複数のアイテムを受信した場合、最初のアイテムに対してのみ出力を生成するというものです。たとえば、以下では最初の 1 秒間にaa
andを入力しますが、(1 秒後) 出力としてのみ生成されます。bb
AA
$ nc localhost 3344
aa
bb
AA <after one second>
次に、後で追加の行を入力すると、(1 秒後に) 出力が得られますが、前の入力からのものです。
cc
BB <after one second>
send()
遅延で期待どおりに動作させる方法はありFlux
ますか?