4

私は、リポジトリのサブセットにアクセスしたいという一般的なシナリオで作業しています。たとえば、「価格が 10 を超えるすべての注文を取得する」など、更新を維持する必要はありません。解決策を実装しましたが、2 つの問題があります (最後に記載)。

リポジトリのサブセットは、次のものと同等のもので実現できます

var expensiveOrders = Repository.GetOrders().Where(o => o.Price > 10);

ただし、これはIEnumerable元のコレクションが更新されても更新されません。のハンドラを追加できますがCollectionChanged、さらにサブセットにアクセスしたい場合はどうすればよいでしょうか?

var expensiveOrdersFromBob = expensiveOrders.Where(o => o.Name == Bob);

これについてもコレクションを変更する必要があります。ライブ更新の概念から Rx を考えるようになったので、それ自体を自動更新するアイテムと通知用の RX ストリームの両方を含むObservableCacheの構築に着手しました。ObservableCollection(ストリームは、内部でキャッシュを更新するものでもあります。)

class ObservableCache<T> : IObservableCache<T>
{
    private readonly ObservableCollection<T> _cache;
    private readonly IObservable<Tuple<T, CRUDOperationType>> _updates;

    public ObservableCache(IEnumerable<T> initialCache
       , IObservable<Tuple<T, CRUDOperationType>> currentStream, Func<T, bool> filter)
        {
          _cache = new ObservableCollection<T>(initialCache.Where(filter));
          _updates = currentStream.Where(tuple => filter(tuple.Item1));
          _updates.Subscribe(ProcessUpdate);
        }

    private void ProcessUpdate(Tuple<T, CRUDOperationType> update)
    {
        var item = update.Item1;
        lock (_cache)
        {
            switch (update.Item2)
            {
                case CRUDOperationType.Create:
                    _cache.Add(item);
                    break;
                case CRUDOperationType.Delete:
                    _cache.Remove(item);
                    break;
                case CRUDOperationType.Replace:
                case CRUDOperationType.Update:
                    _cache.Remove(item); // ToDo: implement some key-based equality
                    _cache.Add(item);
                    break;
            }
        }
    }

    public ObservableCollection<T> Cache
    {
        get { return _cache; }
    }

    public IObservable<T> Updates
    {
        get { return _updates.Select(tuple => tuple.Item1); }
    }

    public IObservableCache<T> Where(Func<T, bool> predicate)
    {
        return new ObservableCache<T>(_cache, _updates, predicate);
    }
}

その後、次のように使用できます。

var expensiveOrders = new ObservableCache<Order>(_orders
                                                 , updateStream
                                                 , o => o.Price > 10);
expensiveOrders.Updates.Subscribe
     (o => Console.WriteLine("Got new expensive order: " + o));
_observableBoundToSomeCtrl = expensiveOrders.Cache;

var expensiveOrdersFromBob = expensiveOrders
                             .Where(o => o.Name == "Bob");
expensiveOrdersFromBob.Updates.Subscribe
         (o => Console.WriteLine("Got new expensive order from Bob: " + o));
_observableBoundToSomeOtherCtrl = expensiveOrdersFromBob.Cache;

などなど、キャッシュをより狭いサブセットに投影し続けることができ、同期がずれることを心配する必要がないという考えです。それで、私の問題は何ですか?

  1. RX に本質的にコレクションを更新させることで、CRUD をなくすことができるかどうか疑問に思っています。たぶん、Selectなどで更新を「プロジェクト」しますか?
  2. 新しいキャッシュを構築している間にいくつかの更新を見逃す可能性があるという点で、repository-with-update パターンに固有の競合状態があります。何らかの順序付けが必要だと思いますが、それはすべての T オブジェクトにISequenceableItemインターフェイスを実装させることを意味します。これを行うより良い方法はありますか?RX は、すべてのスレッド化を処理するので優れています。それを活かしたい。
4

4 に答える 4

1

異なる方法ではあるが、あなたが望むものを達成するこれらの2つのプロジェクトを見てください:

https://github.com/RolandPheasant/DynamicData

https://bitbucket.org/mendelmonteiro/reactivetables [免責事項: これは私のプロジェクトです]

于 2015-06-17T13:46:27.517 に答える
1

次のような定義があるとします。

class SetOp<T>
{
    public T Value { get; private set; }
    public bool Include { get; private set; }
    public SetOp(T value, bool include)
    {
        Value = value;
        Include = include;
    }
}

を使用するObservable.Scanと、System.Collections.Immutable次のようなことができます。

IObservable<SetOp<int>> ops = ...;
IImmutableSet<int> empty = ImmutableSortedSet<int>.Empty;
var observableSet = ops
    .Scan(empty, (s, op) => op.Include ? s.Add(op.Value) : s.Remove(op.Value))
    .StartWith(empty);

ここでの重要なトリックは、不変のコレクション型を使用することです。 のオブザーバーはobservableSet、値が不変であるため、プッシュされた値で好きなことを行うことができます。連続する値の間で設定されたデータ構造の大部分を再利用するため、効率的です。

opsストリームと対応するの例を次に示しobservableSetます。

ops       observableSet
--------  ------------------
          {}
Add 7     {7}
Add 4     {4,7}
Add 5     {4,5,7}
Add 6     {4,5,6,7}
Remove 5  {4,6,7}
Add 8     {4,6,7,8}
Remove 4  {6,7,8}
于 2015-06-17T18:54:37.713 に答える
0

_cache内でロックする必要はありませんProcessUpdate。ソース オブザーバブルcurrentStreamが Rx ガイドラインを尊重している場合、一度に 1 つの呼び出し内にのみいることが保証されますOnNext。つまり、前の値をまだ処理している間は、ストリームから別の値を受け取ることはありません。

競合状態を解決する唯一の確実な方法は、updateStreamがデータの生成を開始する前にキャッシュを作成することです。

Extensions for Reactive Extensions (Rxx) をご覧になることをお勧めします。Dave は、UI コントロールを監視可能なデータにバインドするためのユーティリティを数多く作成したと思います。ドキュメントはまばらです。あなたがしていることに何かがあるかどうかはわかりません。

于 2013-04-15T20:31:07.917 に答える