1

私は調査を行い、ドキュメントを読みましたが、それらはあまり理解できません。私が達成しようとしているのは、次の機能です。

Spring Reactor プロジェクトを使用し、eventBus を使用しています。イベント バスがモジュール A にイベントをスローしています。

モジュール A はイベントを受け取り、一意の値を保持する Hot Stream に挿入する必要があります。250 ミリ秒ごとに、ストリームはすべての値を取得して計算を行う必要があります。

例: eventBus は次の番号のイベントをスローしています: 1,2,3,2,3,2

ストリームは一意の値を取得して保持する必要があります-> 1、2、3 250ミリ秒後、ストリームは数値と空の値を出力する必要があります

開始方法を知っている人はいますか? 例を試してみましたが、実際には何も機能せず、何かを理解していないと思います。誰にも例がありますか?

TNX

編集:

次のことをしようとすると、常に例外が発生します:

        Stream<List<Integer>> s = Streams.wrap(p).buffer(1, TimeUnit.SECONDS);

        s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));

        for (int i = 0; i < 10000; i++) {
            p.onNext(i);
        }

例外:

java.lang.IllegalStateException: The environment has not been initialized yet
    at reactor.Environment.get(Environment.java:156) ~[reactor-core-2.0.7.RELEASE.jar:?]
    at reactor.Environment.timer(Environment.java:184) ~[reactor-core-2.0.7.RELEASE.jar:?]
    at reactor.rx.Stream.getTimer(Stream.java:3052) ~[reactor-stream-2.0.7.RELEASE.jar:?]
    at reactor.rx.Stream.buffer(Stream.java:2246) ~[reactor-stream-2.0.7.RELEASE.jar:?]
    at com.ta.ng.server.controllers.user.UserController.getUsersByOrgId(UserController.java:70) ~[classes/:?]

ご覧のとおり、この問題に合格しない限り、試行を続けることはできません。

ところで:これは、私が使用するbuffer(1, TimeUnit.SECONDS)場合にのみ発生buffer(50)します。たとえば、使用すると機能します..

4

1 に答える 1