1

私はメッセージのソースを持っていますObservable。すべてのメッセージに対して、別の を生成する HTTP 呼び出しを行いたいObservableので、それらを と組み合わせて、flatMapサブスクライバーにシンクします。このシナリオのコードは次のとおりです。

Rx.Observable.interval(1000)
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.subscribe (result) ->
  console.info "Processed: ", result

この例は coffeescript で書かれていますが、問題文は他の Rx 実装でも有効だと思います。

このアプローチで私が抱えている問題はloadMessages、大量のメッセージが非常に迅速に生成されることです。これは、非常に短い時間で大量の HTTP リクエストを行うことを意味します。これは私の状況では受け入れられないため、並列 HTTP リクエストの量を 10 程度に制限したいと考えています。つまり、HTTP リクエストを作成するときに、pipelene を調整するか、ある種の backpresure を適用したいと考えています。

Rx がこの種の状況に対処するための標準的なアプローチやベスト プラクティスはありますか?

現在、システムで処理中のメッセージが多すぎる場合に tick を無視する、非常に単純な (そしてかなり次善の) backpresure メカニズムを実装しました。次のようになります (簡易版)。

Rx.Observable.interval(1000)
.filter (tick) ->
  stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
  stats.messageIn()
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.do (tick) ->
  stats.messageOut()
.subscribe (result) ->
  console.info "Processed: ", result

しかし、これがもっとうまくできるかどうか、あるいは Rx がこの種の要件に対処するためのメカニズムをすでに備えているかどうかはわかりません。

4

3 に答える 3

2

これは厳密にはバックプレッシャではなく、並行性を制限しているだけです。これを行う簡単な方法は次のとおりです(TextAreaを介したコーディング、間違っている可能性のある構文は無視してください):

Rx.Observable.interval(1000)
    .flatMap (tick) ->
        // returns an `Observable`
        loadMessages()
    .map (message) ->
        // also returns and `Observable`, but only when
        // someone first subscribes to it
        Rx.Observable.defer ->
            makeHttpRequest(message)
    .merge 10 // at a time
    .subscribe (result) ->
        console.info "Processed: ", result

C# では、同等の考え方は ではなく、SelectManyですSelect(Defer(x)).Merge(n)。最大で進行中の Observable をMerge(int)サブスクライブし、残りは後でバッファリングします。nがある理由は、 がサブスクライブするDeferまで何もしないようにするためです。Merge(n)

于 2014-03-20T20:12:19.710 に答える
1

RXJS では、backpressure サブモジュールを使用できます

http://rxjs.codeplex.com/SourceControl/latest#src/core/backpressure/

disclaimer私は JS の RX バージョンを使用したことがありませんが、バックプレッシャを実装する標準的な方法を求めており、コア ライブラリはそれをサポートしているようです。RX c# にはまだこのサポートがありません。理由がわからない。

于 2014-03-18T05:15:14.983 に答える