イベントのストリームがあり、これらのイベントごとに promise を返す関数を呼び出したいと考えています。問題は、この関数が非常に高価であるため、一度に最大 n 個のイベントを処理したいということです。
この小石の図はおそらく間違っていますが、これは私が望むものです:
---x--x--xxxxxxx-------------x-------------> //Events
---p--p--pppp------p-p-p-----p-------------> //In Progress
-------d--d--------d-d-dd------dddd--------> //Promise Done
---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES
これは私がこれまでに持っているコードです:
var n = 4;
var inProgressCount = 0;
var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
.map((ev) => new Date().getTime());
var inProgress$ = events$.controlled();
var done$ = inProgress$
.tap(() => inProgressCount++)
.flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));
done$.subscribeOnNext((timestamp) => {
inProgressCount--;
inProgress$.request(Math.max(1, n - inProgressCount));
});
inProgress$.request(n);
このコードには 2 つの問題があります。
inProgressCount
副作用関数で更新される var を使用しています。done$ サブスクリプションは、制御されたストリームから複数のアイテムを要求したときに 1 回だけ呼び出されます。これにより、inProgressCount
変数が誤って更新され、最終的にキューが一度に 1 つに制限されます。
ここで動作していることがわかります: http://jsbin.com/wivehonifi/1/edit?js,console,output
質問:
- より良いアプローチはありますか?
inProgressCount
どうすれば変数を取り除くことができますか?複数のアイテムを要求するときに done$ サブスクリプションが 1 回しか呼び出されないのはなぜですか?
更新:
質問 #3 への回答: switchMap は flatMapLatest と同じであるため、最後の 1 つしか取得できませんでした。コードを switchMap ではなく flatMap に更新しました。