私はメッセージのソースを持っていますObservable
。すべてのメッセージに対して、別の を生成する HTTP 呼び出しを行いたいObservable
ので、それらを と組み合わせて、flatMap
サブスクライバーにシンクします。このシナリオのコードは次のとおりです。
Rx.Observable.interval(1000)
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.flatMap (message) ->
// also returns and `Observable`
makeHttpRequest(message)
.subscribe (result) ->
console.info "Processed: ", result
この例は coffeescript で書かれていますが、問題文は他の Rx 実装でも有効だと思います。
このアプローチで私が抱えている問題はloadMessages
、大量のメッセージが非常に迅速に生成されることです。これは、非常に短い時間で大量の HTTP リクエストを行うことを意味します。これは私の状況では受け入れられないため、並列 HTTP リクエストの量を 10 程度に制限したいと考えています。つまり、HTTP リクエストを作成するときに、pipelene を調整するか、ある種の backpresure を適用したいと考えています。
Rx がこの種の状況に対処するための標準的なアプローチやベスト プラクティスはありますか?
現在、システムで処理中のメッセージが多すぎる場合に tick を無視する、非常に単純な (そしてかなり次善の) backpresure メカニズムを実装しました。次のようになります (簡易版)。
Rx.Observable.interval(1000)
.filter (tick) ->
stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
stats.messageIn()
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.flatMap (message) ->
// also returns and `Observable`
makeHttpRequest(message)
.do (tick) ->
stats.messageOut()
.subscribe (result) ->
console.info "Processed: ", result
しかし、これがもっとうまくできるかどうか、あるいは Rx がこの種の要件に対処するためのメカニズムをすでに備えているかどうかはわかりません。