ポイントに到達するためにボイラープレートを削除しました
// a.js
// My observables from stream and event
this.a = Rx.Node.fromStream(this.aStream());
this.itemSource = Rx.Observable.fromEvent(ee, 'addItem');
// Zip 'em
this.itemcombo = Rx.Observable.zip(this.a, this.itemSource, function (s1, s2) {
return {item: s2, a: s1.toString()};
});
// Streams the lowercase alphabet
rb.prototype.aStream = function aStream() {
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
console.log('Hit!');
if (c > 'z'.charCodeAt(0)) {
rs.push(null);
}
};
return rs;
};
// b.js
(上記でエクスポートしたモジュールが必要です)
rb.enqueue('a'); // The method simply does an ee.emit('addItem', ...) in the module to trigger the itemSource observable
私が期待していたもの:
{item: 'a', a: 'a'}
コンソールに出力
どうしたの:
Hit!
24回前に印刷されました{item: 'a', a: 'a'}
。これは、zip
が からすべての値を取得しaStream
、それらをバッファリングしてから、本来の処理を実行したことを意味します。
zip
同じ機能を遅延して提供するにはどうすればよいですか? 私の目標は、無限のストリーム/オブザーバブルを使用し、有限 (非同期) のもので圧縮することです。
編集
ランナブルを介して参照/編集: RX Zip test 編集 2コードは回答に基づいて更新されました -> 現在は出力されていません。