私は、リポジトリのサブセットにアクセスしたいという一般的なシナリオで作業しています。たとえば、「価格が 10 を超えるすべての注文を取得する」など、更新を維持する必要はありません。解決策を実装しましたが、2 つの問題があります (最後に記載)。
リポジトリのサブセットは、次のものと同等のもので実現できます
var expensiveOrders = Repository.GetOrders().Where(o => o.Price > 10);
ただし、これはIEnumerable
元のコレクションが更新されても更新されません。のハンドラを追加できますがCollectionChanged
、さらにサブセットにアクセスしたい場合はどうすればよいでしょうか?
var expensiveOrdersFromBob = expensiveOrders.Where(o => o.Name == Bob);
これについてもコレクションを変更する必要があります。ライブ更新の概念から Rx を考えるようになったので、それ自体を自動更新するアイテムと通知用の RX ストリームの両方を含むObservableCacheの構築に着手しました。ObservableCollection
(ストリームは、内部でキャッシュを更新するものでもあります。)
class ObservableCache<T> : IObservableCache<T>
{
private readonly ObservableCollection<T> _cache;
private readonly IObservable<Tuple<T, CRUDOperationType>> _updates;
public ObservableCache(IEnumerable<T> initialCache
, IObservable<Tuple<T, CRUDOperationType>> currentStream, Func<T, bool> filter)
{
_cache = new ObservableCollection<T>(initialCache.Where(filter));
_updates = currentStream.Where(tuple => filter(tuple.Item1));
_updates.Subscribe(ProcessUpdate);
}
private void ProcessUpdate(Tuple<T, CRUDOperationType> update)
{
var item = update.Item1;
lock (_cache)
{
switch (update.Item2)
{
case CRUDOperationType.Create:
_cache.Add(item);
break;
case CRUDOperationType.Delete:
_cache.Remove(item);
break;
case CRUDOperationType.Replace:
case CRUDOperationType.Update:
_cache.Remove(item); // ToDo: implement some key-based equality
_cache.Add(item);
break;
}
}
}
public ObservableCollection<T> Cache
{
get { return _cache; }
}
public IObservable<T> Updates
{
get { return _updates.Select(tuple => tuple.Item1); }
}
public IObservableCache<T> Where(Func<T, bool> predicate)
{
return new ObservableCache<T>(_cache, _updates, predicate);
}
}
その後、次のように使用できます。
var expensiveOrders = new ObservableCache<Order>(_orders
, updateStream
, o => o.Price > 10);
expensiveOrders.Updates.Subscribe
(o => Console.WriteLine("Got new expensive order: " + o));
_observableBoundToSomeCtrl = expensiveOrders.Cache;
var expensiveOrdersFromBob = expensiveOrders
.Where(o => o.Name == "Bob");
expensiveOrdersFromBob.Updates.Subscribe
(o => Console.WriteLine("Got new expensive order from Bob: " + o));
_observableBoundToSomeOtherCtrl = expensiveOrdersFromBob.Cache;
などなど、キャッシュをより狭いサブセットに投影し続けることができ、同期がずれることを心配する必要がないという考えです。それで、私の問題は何ですか?
- RX に本質的にコレクションを更新させることで、CRUD をなくすことができるかどうか疑問に思っています。たぶん、Selectなどで更新を「プロジェクト」しますか?
- 新しいキャッシュを構築している間にいくつかの更新を見逃す可能性があるという点で、repository-with-update パターンに固有の競合状態があります。何らかの順序付けが必要だと思いますが、それはすべての T オブジェクトに
ISequenceableItem
インターフェイスを実装させることを意味します。これを行うより良い方法はありますか?RX は、すべてのスレッド化を処理するので優れています。それを活かしたい。