3

イベントのストリームがあり、これらのイベントごとに promise を返す関数を呼び出したいと考えています。問題は、この関数が非常に高価であるため、一度に最大 n 個のイベントを処理したいということです。

この小石の図はおそらく間違っていますが、これは私が望むものです:

---x--x--xxxxxxx-------------x------------->  //Events
---p--p--pppp------p-p-p-----p------------->  //In Progress
-------d--d--------d-d-dd------dddd-------->  //Promise Done

---1--21-2-34-----------3----4-3210--------   //QUEUE SIZE CHANGES

これは私がこれまでに持っているコードです:

var n = 4;
var inProgressCount = 0;

var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
  .map((ev) => new Date().getTime());

var inProgress$ = events$.controlled();

var done$ = inProgress$      
  .tap(() => inProgressCount++)
  .flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));

done$.subscribeOnNext((timestamp) => {
  inProgressCount--;
  inProgress$.request(Math.max(1, n - inProgressCount));
});

inProgress$.request(n);

このコードには 2 つの問題があります。

  1. inProgressCount副作用関数で更新される var を使用しています。
  2. done$ サブスクリプションは、制御されたストリームから複数のアイテムを要求したときに 1 回だけ呼び出されます。これにより、inProgressCount変数が誤って更新され、最終的にキューが一度に 1 つに制限されます。

ここで動作していることがわかります: http://jsbin.com/wivehonifi/1/edit?js,console,output

質問:

  1. より良いアプローチはありますか?
  2. inProgressCountどうすれば変数を取り除くことができますか?
  3. 複数のアイテムを要求するときに done$ サブスクリプションが 1 回しか呼び出されないのはなぜですか?

更新:
質問 #3 への回答: switchMap は flatMapLatest と同じであるため、最後の 1 つしか取得できませんでした。コードを switchMap ではなく flatMap に更新しました。

4

1 に答える 1

4

実際には、背圧を使用する必要はまったくありません。これを行うオペレーターが呼び出さflatMapWithMaxConcurrentれます。これは基本的に呼び出しのエイリアス.map().merge(concurrency)であり、一度に最大数のストリームしか実行できません。

ここで jsbin を更新しました: http://jsbin.com/weheyuceke/1/edit?js,output

ただし、以下の重要な部分に注釈を付けました。

const concurrency = 4;

var done$ = events$
  //Only allows a maximum number of items to be subscribed to at a time
  .flatMapWithMaxConcurrent(concurrency, 
    ({timestamp}) =>   
      //This overload of `fromPromise` defers the execution of the lambda
      //until subscription                    
      Rx.Observable.fromPromise(() => { 
        //Notify the ui that this task is in progress                                 
        updatePanelAppend(inProgress, timestamp);
        removeFromPanel(pending, timestamp);
        //return the task
        return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
     }));
于 2016-07-27T20:42:59.050 に答える