6

RxJs で Time Expiry キャッシュを実装したいと思います。「通常の」キャッシュの例を次に示します。

//let this represents "heavy duty job"
var data = Rx.Observable.return(Math.random() * 1000).delay(2000);

//and we want to cache result
var cachedData = new Rx.AsyncSubject();
data.subscribe(cachedData);

cachedData.subscribe(function(data){
    //after 2 seconds, result is here and data is cached
    //next subscribe returns immediately data
    cachedData.subscribe(function(data2){ /*this is "instant"*/ });
});

初めてsubscribeonを呼び出すと、「重作業」が呼び出され、2 秒後に結果が( ) に保存されます。他の後続のonは、保存された結果をすぐに返します (したがって、キャッシュの実装)。 cachedDatacachedDataAsyncSubjectsubscribecachedData

私が達成したいのcachedDataは、有効な期間内にこれを「スパイス」することです。その時間が経過したら、新しいデータに対して「ヘビーデューティジョブ」を再実行し、これを新しい時間に再度キャッシュします期間など...

望ましい動作:

//pseudo code
cachedData.youShouldExpireInXSeconds(10);


//let's assume that all code is sequential from here

//this is 1.st run
cachedData.subscribe(function (data) {
    //this first subscription actually runs "heavy duty job", and
    //after 2 seconds first result data is here
});

//this is 2.nd run, just after 1.st run finished
cachedData.subscribe(function (data) {
    //this result is cached
});

//15 seconds later
// cacheData should expired
cachedData.subscribe(function (data) {
    //i'm expecting same behaviour as it was 1.st run:
    // - this runs new "heavy duty job"
    // - and after 2 seconds we got new data result
});


//....
//etc

私は Rx(Js) が初めてで、クールダウンを使用してこのホット オブザーバブルを実装する方法がわかりません。

4

2 に答える 2

6

欠けているのは、一定期間後にcachedData新しいものに置き換えるタスクをスケジュールすることだけです。新しいメソッドAsyncSubjectとしてそれを行う方法は次のとおりです。Rx.Observable

Rx.Observable.prototype.cacheWithExpiration = function(expirationMs, scheduler) {
    var source = this,
        cachedData = undefined;

    // Use timeout scheduler if scheduler not supplied
    scheduler = scheduler || Rx.Scheduler.timeout;

    return Rx.Observable.create(function (observer) {

        if (!cachedData) {
            // The data is not cached.
            // create a subject to hold the result
            cachedData = new Rx.AsyncSubject();

            // subscribe to the query
            source.subscribe(cachedData);

            // when the query completes, start a timer which will expire the cache
            cachedData.subscribe(function () {
                scheduler.scheduleWithRelative(expirationMs, function () {
                    // clear the cache
                    cachedData = undefined;
                });
            });
        }

        // subscribe the observer to the cached data
        return cachedData.subscribe(observer);
    });
};

使用法:

// a *cold* observable the issues a slow query each time it is subscribed
var data = Rx.Observable.return(42).delay(5000);

// the cached query
var cachedData = data.cacheWithExpiration(15000);

// first observer must wait
cachedData.subscribe();

// wait 3 seconds

// second observer gets result instantly
cachedData.subscribe();

// wait 15 seconds

// observer must wait again
cachedData.subscribe();
于 2014-10-21T13:54:34.987 に答える