1

並列キャッシュを構築しようとしています。要件は、一度に起動する必要があるn個の datacollector があることです。これらのデータ コレクターはそれぞれ、境界層 (これをサービス層と呼びます) に到達し、データを取得します。ただし、これは同じ要求 (WCF) 内にあるため、2 つのデータ コレクターがサービス層で同じメソッドを呼び出す必要がある場合、最初の要求が完了するまで 2 番目の要求を待機させたくありません。

これは、データ コレクターを構築する開発者に対して透過的に構築する必要があります (Unity Interception を使用して、このキャッシング アスペクトを挿入します)。

フローは次のようになります。Reactive 拡張機能は、この種の設計に適していますか? 私は過去に Rx を扱ったことがなく、開発の 10 日後にレンガの壁にぶつかりたくありません。それ以外の場合は、async、await、およびイベントの組み合わせもここでうまく機能する可能性があります。

編集: Rx を使用してこれを実装しました - マルチスレッドのコンテキストでうまく機能します。興味深いのは、tryGet の代わりに add を試したことです。(これは Unity インターセプト CallHandler です)

 /// <summary>
    /// Intercepts the calls and tries to retrieve from the cache
    /// </summary>
    class CacheCallHandler : ICallHandler
    {

        [Dependency]
        public ICache RequestCache { get; set; }

        public IMethodReturn Invoke(IMethodInvocation input, GetNextHandlerDelegate getNext)
        {
            IMethodReturn mesg = null;

            string cacheKey = CacheKeyGenerator.GetCacheKey(input);

            //create the task to retrieve the data
            var task = new Task<IMethodReturn>(() =>
            {
                return getNext()(input, getNext);
            });

            //make it observable
            var observableItem = task.ToObservable();

            //try to add it to the cache
            //we need to do this in the order of Add and then try to get, otherwise multiple thread might enter the same area
            if (RequestCache.TryAdd(cacheKey, observableItem))
            {
                //if the add succeeed, it means that we are responsible to starting this task
                task.Start();
            }
            else
            {
                if ( RequestCache.TryGetValue(cacheKey, out observableItem) )
                {
                    //do nothing, the observable item is already updated with the requried reference
                }
                else
                {
                    throw new CacheHandlerException("Could not add to cache AND could not retrieve from cache either. Something's wrong", input);
                }
            }

            //observe the return 
            if ( observableItem != null )
                mesg = observableItem.FirstOrDefault();

            if (mesg == null)
                throw new CacheHandlerException("Not return value found. this should not happen", input);

            return mesg;
        }


        /// <summary>
        /// Should always be the first to execute on the boundary
        /// </summary>
        public int Order
        {
            get { return 1; }
            set { ; }
        }
    }

ここに画像の説明を入力

4

3 に答える 3

4

https://github.com/reactiveui/ReactiveUI/blob/master/ReactiveUI/ObservableAsyncMRUCache.csはすでに、基本的にはまさにあなたが望むことを行っています。具体的には、同じコンテンツに対する 2 つのリクエストが「デバウンス」されています。コメントから:

/// ObservableAsyncMRUCache implements memoization for asynchronous or
/// expensive to compute methods. This memoization is an MRU-based cache
/// with a fixed limit for the number of items in the cache.     
///
/// This class guarantees that only one calculation for any given key is
/// in-flight at a time, subsequent requests will wait for the first one and
/// return its results (for example, an empty web image cache that receives
/// two concurrent requests for "Foo.jpg" will only issue one WebRequest -
/// this does not mean that a request for "Bar.jpg" will wait on "Foo.jpg").
///
/// Concurrency is also limited by the maxConcurrent parameter - when too
/// many in-flight operations are in progress, further operations will be
/// queued until a slot is available.
于 2012-10-01T01:03:02.877 に答える
1

はい、Rx はこれに最適です。

キーキャッシュをサポートするために、次の辞書を実装することを検討することをお勧めします。

Dictionary<K, AsyncSubject<V>>

データを非同期的に取得する部分では、サブジェクトをサブスクライブして結果を入力するだけで済みます。

于 2012-09-29T22:34:59.870 に答える
0

async私は解決策、具体的にAsyncLazy<T>は(私のブログから)を使用する解決策に傾倒します:

public sealed class MyCache<TKey, TValue>
{
  private readonly ConcurrentDictionary<TKey, AsyncLazy<TValue>> dictionary =
      new ConcurrentDictionary<TKey, AsyncLazy<TValue>>();

  private readonly Func<TKey, Task<TValue>> LookupAsync;

  public MyCache(Func<TKey, Task<TValue>> lookupAsync)
  {
    LookupAsync = lookupAsync;
  }

  public AsyncLazy<TValue> Get(TKey key)
  {
    return dictionary.GetOrAdd(key,
        key => new AsyncLazy<TValue>(() => lookupAsync(key)));
  }
}

これは、有効期限がないため、非常に単純な「キャッシュ」です。それはそのように使用することができます:

MyCache<string, MyResource> cache = new MyCache<string, MyResource>(async key =>
{
  MyResource ret = await DataLayer.GetResourceAsync(key);
  ...
  return ret;
});
MyResource resource = await cache.Get("key");

マルチスレッドの状況でGetOrAddは、余分なを作成する可能性がありますが、編集AsyncLazy<TValue>されることはないため、ごとに1回だけ呼び出されます。また、これは常にスレッドプールから呼び出されることに注意してください。awaitlookupAsyncTKeylookupAsync

PSあなたが行くならasync、あなたは私のasyncWCFの投稿が役に立つと思うかもしれません。

于 2012-09-30T01:19:47.263 に答える