1

ポイントに到達するためにボイラープレートを削除しました

// a.js

// My observables from stream and event
this.a = Rx.Node.fromStream(this.aStream());
this.itemSource = Rx.Observable.fromEvent(ee, 'addItem');

// Zip 'em
this.itemcombo = Rx.Observable.zip(this.a, this.itemSource, function (s1, s2) {
    return {item: s2, a: s1.toString()};
});

// Streams the lowercase alphabet
rb.prototype.aStream = function aStream() {
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    console.log('Hit!');
    if (c > 'z'.charCodeAt(0)) {
        rs.push(null);
    }
};

return rs;
};

// b.js(上記でエクスポートしたモジュールが必要です)

rb.enqueue('a'); // The method simply does an ee.emit('addItem', ...)  in the module to trigger the itemSource observable

私が期待していたもの:

{item: 'a', a: 'a'}コンソールに出力

どうしたの:

Hit!24回前に印刷されました{item: 'a', a: 'a'}。これは、zipが からすべての値を取得しaStream、それらをバッファリングしてから、本来の処理を実行したことを意味します。

zip同じ機能を遅延して提供するにはどうすればよいですか? 私の目標は、無限のストリーム/オブザーバブルを使用し、有限 (非同期) のもので圧縮することです。

編集

ランナブルを介して参照/編集: RX Zip test 編集 2コードは回答に基づいて更新されました -> 現在は出力されていません。

4

1 に答える 1

1

zipは確かに怠け者です。いずれかが新しい値を生成するたびに、サブスクライブしてその作業を実行しaます。b

あなたの問題は、サブスクライブするとfromStreamすぐにすべての値を同期的に発行していることです。zipこれは、あなたのカスタムReadableが常に「利用可能なデータは他にもあります!」と言っているために発生しています。

非同期Readableにすると、目的の動作が得られます。

このようなことを試してください(テストされていません)

var rs = Readable();
var subscription = null;
rs._read = function () {
    if (!subscription) {
        // produce the values once per second
        subscription = Rx.Observable
            .generateWithRelativeTime(
                97, // start value
                function (c) { return c > 'z'.charCodeAt(0); }, // end condition
                function (c) { return c + 1; }, // step function
                function (c) { return String.fromCharCode(c); }, // result selector
                function () { return 1000; }) // 1000ms between values
            .subscribe(
                function (s) {
                    rs.push(s);
                    console.log("Hit!");
                },
                function (error) { rs.push(null); },
                function () { rs.push(null); });
    }
};
于 2014-08-27T14:16:11.253 に答える