サイズに基づいてキューから要素をバッチ処理するためにcyclops-reactを使用しようとしていますが、時間にも基づいているため、要素がない場合はブロックされません
機能が期待したものではないか、何か間違ったことをしている可能性があります
完全なコード (Groovy) は、プロデューサーが別のスレッドにある場合に次のようになります。
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
new Thread({
while (true) {
sleep(1000)
queue.offer("New message " + System.currentTimeMillis());
}
}).start();
StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
.groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
.forEach({i->println(i + " Batch Time: ${System.currentTimeMillis()}")})
出力は次のとおりです。
[New message 1487673650332, Batch Time: 1487673651356]
[New message 1487673651348, New message 1487673652352, Batch Time: 1487673653356]
[New message 1487673653355, New message 1487673654357, Batch Time: 1487673655362]
[New message 1487673655362, New message 1487673656364, Batch Time: 1487673657365]
しかし、提供される要素間の遅延は10秒ですが、バッチ処理は0.5秒ごとであるため、各バッチで1つの要素を期待していました
また、非同期ストリーム(Groovyコード)で試しました:
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
.async()
.groupedBySizeAndTime(10, 500,TimeUnit.MILLISECONDS)
.peek({i->println(i + "Batch Time: ${System.currentTimeMillis()}")}).run();
while (true) {
queue.offer("New message " + System.currentTimeMillis());
sleep(1000)
}
繰り返しますが、2 秒ごとにのみバッチ処理し、バッチのタイムアウトが 0.5 秒であっても、バッチごとに 2 つの要素を待機することがあります。
[New message 1487673877780, Batch Time: 1487673878819]
[New message 1487673878811, New message 1487673879812, Batch Time: 1487673880815]
[New message 1487673880814, New message 1487673881819, Batch Time: 1487673882823]
[New message 1487673882823, New message 1487673883824, Batch Time: 1487673884828]
[New message 1487673884828, New message 1487673885831, Batch Time: 1487673886835]
非未来非遅延ストリームで 3 番目の実験を行いましたが、今回はうまくいきました。
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
new Thread({
while (true) {
sleep(1000)
queue.offer("New message " + System.currentTimeMillis());
}
}).start();
queue.stream()
.groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
.forEach({i->println(i + " Batch Time " + System.currentTimeMillis())})
結果:
[New message 1487673288017, New message 1487673289027, Batch Time , 1487673289055]
[New message 1487673290029, Batch Time , 1487673290029]
[New message 1487673291033, Batch Time , 1487673291033]
[New message 1487673292037, Batch Time , 1487673292037]
将来のストリームを使用すると、バッチ処理の動作が間違っているように見えるのはなぜですか?