0

受信処理要求がありますが、共有リソースが枯渇するため、同時処理が多すぎないようにしたいと考えています。また、いくつかの一意のキーを共有するリクエストを同時に実行しないことをお勧めします。

def process(request: Request): Observable[Answer] = ???

requestsStream
  .groupBy(request => request.key)
  .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
      requestsForKey
         .flatMap(1, process)
  })

ただし、キーごとのオブザーバブルが完了しないため、上記は機能しません。これを達成する正しい方法は何ですか?

機能しないもの:

  .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
      // Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently
      requestsForKey.take(1)
         .flatMap(1, process)
  })

 .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
      // The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing
      // This discards all requests after the first if processing takes more than 100 milliseconds
      requestsForKey.timeout(100.millis, Observable.empty)
         .flatMap(1, process)
  })
4

1 に答える 1