次のようなキューが与えられます。
val queue: Queue[Int] = async.boundedQueue[Int](1000)
このキューを取り出して、最大 100 のチャンクでダウンストリーム シンクにストリーミングしたいと考えています。
queue.dequeue.chunk(100).to(downstreamConsumer)
ある程度は機能しますが、メッセージが101個ある場合、キューは空になりません。別の 99 がプッシュされない限り、1 つのメッセージが残ります。ダウンストリーム プロセスが処理できる速さで、最大 100 まで、できるだけ多くのメッセージをキューから取り出したいと考えています。
利用可能な既存のコンビネータはありますか?