0

私はSilverlightのReactiveフレームワークを使用しており、次のことを実現したいと考えています。

私は、MSEntLibで利用可能なキャッシングフレームワークも利用するSilverlightクライアント用の典型的なデータプロバイダーを作成しようとしています。シナリオでは、WCFデータクライアントにアクセスする前に、キーと値のペアのキャッシュをチェックインする必要があります。

Rx拡張機能Ambを使用することで、キャッシュまたはWCFデータクライアントのどちらか早い方からデータをプルできますが、値がキャッシュにある場合、WCFクライアントによる呼び出しの実行を停止するにはどうすればよいですか?

また、競合状態についても検討したいと思います。たとえば、最初のサブスクライバーがデータを要求し、プロバイダーがWCFデータクライアント(非同期)からデータをフェッチしている場合、後続の非同期要求が同じことを行わないようにするにはどうすればよいですか(この段階では、キャッシュはまだ設定されていません)。

4

3 に答える 3

1

私はまったく同じ問題を抱えていました。次の署名を使用した拡張メソッドで解決しました。

IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source,
    Func<T, R> cache,
    Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler) where R : class

事実上、これが行ったことは、ソースのオブザーバブルを取り込み、各入力値をその出力値と一致させるオブザーバブルを返すことでした。

各出力値を取得するには、最初にキャッシュをチェックします。値がキャッシュに存在する場合は、それを使用しました。そうでない場合は、キャッシュにない値に対してのみfetch関数が起動します。すべての値がキャッシュにある場合、関数はスピンアップされないため、サービス接続のセットアップペナルティなどはありません。fetch

コードを紹介しますが、これはモナドを使用する拡張メソッドのわずかに異なるバージョンに基づいているMaybe<T>ため、実装をいじる必要があるかもしれません。

ここにあります:

    public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler)
        where R : class
    {
        return source.FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
    }

    public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler)
    {
        var results = new Subject<R>();

        var disposables = new CompositeDisposable();

        var loop = new EventLoopScheduler();
        disposables.Add(loop);

        var sourceDone = false;
        var pairsDone = true;
        var exception = (Exception)null;

        var fetchIn = new Subject<T>();
        var fetchOut = (IObservable<R>)null;
        var pairs = (IObservable<KeyValuePair<int, R>>)null;

        var lookup = new Dictionary<T, int>();
        var list = new List<Maybe<R>>();
        var cursor = 0;

        Action checkCleanup = () =>
        {
            if (sourceDone && pairsDone)
            {
                if (exception == null)
                {
                    results.OnCompleted();
                }
                else
                {
                    results.OnError(exception);
                }
                loop.Schedule(() => disposables.Dispose());
            }
        };

        Action dequeue = () =>
        {
            while (cursor != list.Count)
            {
                var mr = list[cursor];
                if (mr.HasValue)
                {
                    results.OnNext(mr.Value);
                    cursor++;
                }
                else
                {
                    break;
                }
            }
        };

        Action<KeyValuePair<int, R>> nextPairs = kvp =>
        {
            list[kvp.Key] = Maybe<R>.Something(kvp.Value);
            dequeue();
        };

        Action<Exception> errorPairs = ex =>
        {
            fetchIn.OnCompleted();
            pairsDone = true;
            exception = ex;
            checkCleanup();
        };

        Action completedPairs = () =>
        {
            pairsDone = true;
            checkCleanup();
        };

        Action<T> sourceNext = t =>
        {
            var mr = cache(t);
            list.Add(mr);
            if (mr.IsNothing)
            {
                lookup[t] = list.Count - 1;
                if (fetchOut == null)
                {
                    pairsDone = false;
                    fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
                    pairs = fetchIn.Select(x => lookup[x]).Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
                    disposables.Add(pairs.ObserveOn(loop).Subscribe(nextPairs, errorPairs, completedPairs));
                }
                fetchIn.OnNext(t);
            }
            else
            {
                dequeue();
            }
        };

        Action<Exception> errorSource = ex =>
        {
            sourceDone = true;
            exception = ex;
            fetchIn.OnCompleted();
            checkCleanup();
        };

        Action completedSource = () =>
        {
            sourceDone = true;
            fetchIn.OnCompleted();
            checkCleanup();
        };

        disposables.Add(source.ObserveOn(loop).Subscribe(sourceNext, errorSource, completedSource));

        return results.ObserveOn(scheduler);
    }

使用例は次のようになります。

フェッチしたいインデックスのソースがあります。

IObservable<X> source = ...

キャッシュから値を取得できる関数と、値を入れることができるアクションがあります(両方ともスレッドセーフである必要があります)。

Func<X, Y> getFromCache = x => ...;
Action<X, Y> addToCache = (x, y) => ...;

次に、データベースまたはサービスからデータを取得するための実際の呼び出しがあります。

Func<X, Y> getFromService = x => ...;

fetch次に、次のように定義できます。

Func<IObservable<X>, IObservable<Y>> fetch =
    xs => xs.Select(x =>
    {
        var y = getFromService(x);
        addToCache(x, y);
        return y;
    });

そして最後に、次のように呼び出すことでクエリを実行できます。

IObservable<Y> results =
    source.FromCacheOrFetch(
        getFromCache,
        fetch,
        Scheduler.ThreadPool);

もちろん、計算を実行するには、結果をサブスクライブする必要があります。

于 2012-05-21T09:02:25.360 に答える
0

Amb毎回キャッシュとサービスの両方に影響を与えるため、明らかに正しい方法ではありません。キャッシュが欠落している場合、EntLibは何を返しますか?

Observable.Timeoutこれは合理的な代替手段であることに注意してください。

cache(<paramters>).Timeout(TimeSpan.FromSeconds(1), service<paramters>);

ただし、代わりにEntLibからの戻りを処理し、代わりに適切に動作する場合は、タイムアウトすることはお勧めできません。

これが必ずしもリアクティブエクステンションの問題である理由がわかりません。

于 2012-05-21T07:42:54.697 に答える
0

おそらく@Enigmativityのソリューションよりも完全には機能していない単純なアプローチは、次のようなものになる可能性があります。

public IObservable<T> GetCachedValue<TKey, TResult>(TKey key, Func<TKey, TResult> getFromCache, Func<TKey, TResult> getFromSource)
{
    return getFromCache(<key>).Concat(getFromSource(<key>).Take(1);
}

これは大まかに形成されたアイデアであり、次のように追加する必要があります。

  • アイテムをキャッシュに追加するメカニズム、またはgetFromSourceが結果をキャッシュすると想定するメカニズム
  • 同じキャッシュされていないキーのソースでの複数のヒットを防ぐためのある種のスレッドセーフ(必要な場合)
  • アイテムがキャッシュになかった場合、getFromCacheはObservable.Empty()を返す必要があります。

しかし、単純なものが必要な場合は、開始するのに悪い場所ではありません。

于 2015-07-07T04:46:02.570 に答える