0

サイズに基づいてキューから要素をバッチ処理するためにcyclops-reactを使用しようとしていますが、時間にも基づいているため、要素がない場合はブロックされません

機能が期待したものではないか、何か間違ったことをしている可能性があります

完全なコード (Groovy) は、プロデューサーが別のスレッドにある場合に次のようになります。

            Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    new Thread({
        while (true) {
            sleep(1000)
            queue.offer("New message " + System.currentTimeMillis());
        }
    }).start();

    StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
            .groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
            .forEach({i->println(i + " Batch Time: ${System.currentTimeMillis()}")})

出力は次のとおりです。

    [New message 1487673650332,  Batch Time: 1487673651356]
    [New message 1487673651348, New message 1487673652352,  Batch Time: 1487673653356]
    [New message 1487673653355, New message 1487673654357,  Batch Time: 1487673655362]
    [New message 1487673655362, New message 1487673656364,  Batch Time: 1487673657365]

しかし、提供される要素間の遅延は10秒ですが、バッチ処理は0.5秒ごとであるため、各バッチで1つの要素を期待していました

また、非同期ストリーム(Groovyコード)で試しました:

    Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
            .async()
            .groupedBySizeAndTime(10, 500,TimeUnit.MILLISECONDS)
            .peek({i->println(i + "Batch Time: ${System.currentTimeMillis()}")}).run();

    while (true) {
        queue.offer("New message " + System.currentTimeMillis());
        sleep(1000)
    }

繰り返しますが、2 秒ごとにのみバッチ処理し、バッチのタイムアウトが 0.5 秒であっても、バッチごとに 2 つの要素を待機することがあります。

    [New message 1487673877780, Batch Time: 1487673878819]
    [New message 1487673878811, New message 1487673879812, Batch Time: 1487673880815]
    [New message 1487673880814, New message 1487673881819, Batch Time: 1487673882823]
    [New message 1487673882823, New message 1487673883824, Batch Time: 1487673884828]
    [New message 1487673884828, New message 1487673885831, Batch Time: 1487673886835]

非未来非遅延ストリームで 3 番目の実験を行いましたが、今回はうまくいきました。

    Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    new Thread({
        while (true) {
            sleep(1000)
            queue.offer("New message " + System.currentTimeMillis());
        }
    }).start();

    queue.stream()
            .groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
            .forEach({i->println(i + " Batch Time " + System.currentTimeMillis())})

結果:

    [New message 1487673288017, New message 1487673289027,  Batch Time , 1487673289055]
    [New message 1487673290029,  Batch Time , 1487673290029]
    [New message 1487673291033,  Batch Time , 1487673291033]
    [New message 1487673292037,  Batch Time , 1487673292037]

将来のストリームを使用すると、バッチ処理の動作が間違っているように見えるのはなぜですか?

4

1 に答える 1