0

RxJava 1 / RxScala では、次の状況で観察可能なソースをどのように抑制/バックプレッシャできますか?

def fast: Observable[Foo] // Supports backpressure

def afterExpensiveOp: Observable[Bar] = 
    fast.flatMap(foo => Observable.from(expensiveOp(foo))

// Signature and behavior is out of my control
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = {
   if(noResources()) Future.failed(new OutOfResourcesException())
   else Future { Bar() }
}

可能な解決策は、ブロックすることです。これは機能しますが、それは非常に洗練されておらず、複数の同時リクエストを防ぎます:

def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo => 
   Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head)
)
4

1 に答える 1

0

flatMap には、同時サブスクライバーの数を制限するパラメーターがあります。この flatMap を使用すると、バックプレッシャーが処理されます。

def afterExpensiveOp = fast.flatMap(safeNumberOfConccurrentExpensiveOps, x => Observable.from(expensiveOp(x)))
于 2016-11-15T09:25:37.643 に答える