だから私はあなたが必要とすることのほとんどを行うと思ういくつかのコードを動作させました。基本的に、私は のzipAndContinue
ように動作する関数を作成しましたzip
が、基になるストリームの一部に発行するデータがある限り、アイテムを発行し続けます。この関数は、コールド オブザーバブルでのみ [簡単に] テストされています。
また、修正/拡張/編集は大歓迎です。
function zipAndContinue() {
// Augment each observable so it ends with null
const observables = Array.prototype.slice.call(arguments, 0).map(x => endWithNull(x));
const combined$ = Rx.Observable.combineLatest(observables);
// The first item from the combined stream is our first 'zipped' item
const first$ = combined$.first();
// We calculate subsequent 'zipped' item by only grabbing
// the items from the buffer that have all of the required updated
// items (remember, combineLatest emits each time any of the streams
// updates).
const subsequent$ = combined$
.skip(1)
.bufferWithCount(arguments.length)
.flatMap(zipped)
.filter(xs => !xs.every(x => x === null));
// We return the concatenation of these two streams
return first$.concat(subsequent$)
}
そして、使用されるユーティリティ関数は次のとおりです。
function endWithNull(observable) {
return Rx.Observable.create(observer => {
return observable.subscribe({
onNext: x => observer.onNext(x),
onError: x => observer.onError(x),
onCompleted: () => {
observer.onNext(null);
observer.onCompleted();
}
})
})
}
function zipped(xs) {
const nonNullCounts = xs.map(xs => xs.filter(x => x !== null).length);
// The number of streams that are still emitting
const stillEmitting = Math.max.apply(null, nonNullCounts);
if (stillEmitting === 0) {
return Rx.Observable.empty();
}
// Skip any intermittent results
return Rx.Observable.from(xs).skip(stillEmitting - 1);
}
使用例は次のとおりです。
const one$ = Rx.Observable.from([1, 2, 3, 4, 5, 6]);
const two$ = Rx.Observable.from(['one']);
const three$ = Rx.Observable.from(['a', 'b']);
zipAndContinue(one$, two$, three$)
.subscribe(x => console.log(x));
// >> [ 1, 'one', 'a' ]
// >> [ 2, null, 'b' ]
// >> [ 3, null, null ]
// >> [ 4, null, null ]
// >> [ 5, null, null ]
// >> [ 6, null, null ]
そして、これが js-fiddle です ([実行] をクリックしてコンソールを開くことができます): https://jsfiddle.net/ptx4g6wd/