1

次のようなキューが与えられます。

val queue: Queue[Int] = async.boundedQueue[Int](1000)

このキューを取り出して、最大 100 のチャンクでダウンストリーム シンクにストリーミングしたいと考えています。

queue.dequeue.chunk(100).to(downstreamConsumer) 

ある程度は機能しますが、メッセージが101個ある場合、キューは空になりません。別の 99 がプッシュされない限り、1 つのメッセージが残ります。ダウンストリーム プロセスが処理できる速さで、最大 100 まで、できるだけ多くのメッセージをキューから取り出したいと考えています。

利用可能な既存のコンビネータはありますか?

4

2 に答える 2

0

このため、キューからデキューするときにキューのサイズを監視する必要がある場合があります。次に、サイズが 0 に達すると、それ以上の要素を待つことはありません。実際elastic、キューのサイズに基づいてバッチ処理のサイジングを実装できます。すなわち:

val q = async.unboundedQueue[String]

val deq:Process[Task,(String,Int)] = q.dequeue zip q.size
val elasticChunk: Process1[(String,Int), Vector[String]] = ???
val downstreamConsumer : Sink[Task,Vector[String]] = ???

deq.pipe(elasticChunk) to downstreamConsumer
于 2015-08-29T04:24:52.810 に答える