私のアプリケーションでは、並行して動作する最大 N 個のコンシューマーとプロデューサーがいます。コンシューマーはプロデューサーからリソースを取得し、作業を行い、結果を に追加して、追加のupdateQueue
リソースを要求します。Producer は最初に利用可能なリソースをいくつか持っており、updateQueue
. 新しいリソースがコンシューマに発行される前に、利用可能なすべての更新を適用することが重要です。次のジェネレーターを使用して、消費者が要求を行うたびに「一括」更新を要求し、新しいリソース (消費者には必要ありませんが、後で他の消費者から要求される可能性があります) を確保してみましたticketQueue
:
def updatesOrFresh: Process[Task, Seq[OptimizerResult] \/ Unit] =
Process.await(updateQueue.size.continuous.take(1).runLast) {
case Some(size) =>
println(s"size: $size")
if (size == 0)
wye(updateQueue.dequeueAvailable, ticketQueue.dequeue)(wye.either)
else
updateQueue.dequeueAvailable.map(_.left[Unit])
}.take(1) ++ Process.suspend(updatesOrFresh)
機能しません - 最初に利用可能なリソースが から放出され、ticketQueue.dequeue
次に でブロックされているように見えますwye
。ログ:
size: 0
<<got ticket>>
size: 0
<<got ticket>>
size: 0 // it appears the updateQueue did not receive the consumer output yet, but I can live with that, it should grab an update from the wye anyway
<<blocks>>
で最初に使用可能なリソースが 2 つあったときticketQueue
。ただし、それを単に
val updatesOrFresh = wye(updateQueue.dequeueAvailable, ticketQueue.dequeue)(wye.either)
期待どおりに機能します(ただし、「新しいリソースを発行する前に更新を適用する」という保証はありません)。適切なタイミングで更新が確実に適用されるようにするにはどうすればよいですか?
編集:次のコードを使用して解決しました:
val updatesOrFresh: Process[Task, Seq[OptimizerResult] \/ Unit] =
Process.repeatEval {
for {
sizeOpt <- updateQueue.size.continuous.take(1).runLast
nextOpt <-
if (sizeOpt.getOrElse(???) == 0)
wye(updateQueue.dequeueAvailable, ticketQueue.dequeue)(wye.either).take(1).runLast
else
updateQueue.dequeueAvailable.map(_.left[Unit]).take(1).runLast
} yield nextOpt.getOrElse(???)
}
しかし、なぜオリジナルdef
が機能しなかったのかという疑問は残ります...