4

.Net用の新しいReactiveExtensionsフレームワークを学習しようとしていますが、これは私のアプリケーションに最適なソリューションのようです。例を研究した後(そしてLINQはまだかなり弱い)、RXフレームワークを活用して以下のタスクを実行する方法を理解するのに苦労しています。

目標は、カスタムデータソースとGUIの間に構成可能なイベント「リレー」を作成することです。リレーはLINQを使用して着信イベントをテストおよびフィルタリングし、次の時間間隔を待機している間に修飾されたイベントをリストにキャッシュしてから、GUIスレッドに同期し、受信した順序でイベントを再生します。

RXを使用して、カスタムデータソースの値を追加、変更、および削除するためのイベントなど、複数のイベントのキャッシュ、フィルタリング、およびリレーを調整するにはどうすればよいですか?

これは多くのことを求めているかもしれませんが、この問題に取り組む方法についてのガイダンスは非常にありがたいです。以下のサンプルコードを参照してください...

public delegate void EventDelegateAdd(Thing thing);
public delegate void EventDelegateChange(Thing thing);
public delegate void EventDelegateRemove(Thing thing);

public delegate void EventDelegateBulkChangesStart();
public delegate void EventDelegateBulkChangesEnd();

// The "Things" that are stored in MyCustomDataSource

public class Thing
{
    public string Key { get; set; }
    public string Title { get; set; }
    public object OtherStuff { get; set; }
}

// A custom observable data source with events that indicate when Things are
// added, changed, or removed.

public class MyCustomDataSource
{
    public event EventDelegateAdd AddingThing;
    public event EventDelegateChange ChangingThing;
    public event EventDelegateRemove RemovingThing;

    // The rest of the class that manages the database of Things...
}

// This class forms a configurable event bridge between the MyCustomDataSource and
// the GUI.  The goal is to cache, filter, and throttle the events so that the GUI
// updates only occasionally with bulk changes that are relevant for that control.

public class MyEventCachingBridge
{
    private MyCustomDataSource mSource;

    public event EventDelegateAdd AddingThing;
    public event EventDelegateChange ChangingThing;
    public event EventDelegateRemove RemovingThing;

    public event EventDelegateBulkChangesStart BulkChangesStart;
    public event EventDelegateBulkChangesEnd BulkChangesEnd;


    public MyEventCachingBridge(MyCustomDataSource source, int eventRelayInterval)
    {
        mSource = source;

        // Magical Reactive Extensions code goes here that subscribes to all 3 events...
        //
        //   mSource.AddingThing
        //   mSource.ChangingThing
        //   mSource.RemovingThing
        // 
        //  ...filters and records a list of the events as they are received ( maintaining order of events too ),
        //  then every eventRelayInterval milliseconds, plays back the events in bulk to update the GUI 
        //  ( on the GUIs thread ).  Note that LINQ will be used to filter the Things so that a subset of
        //  Thing changes are relayed to the GUI - i.e. - not all Thing events are observed by the GUI.

    }


    public void PlayBackCachedEvents()
    {
        BulkChangesStart();   // Raise Event to notify GUI to suspend screen updates

        // Play back the list of events to push changes to ListView, TreeView, graphs, etc...
        //
        //  this.AddingThing(Thing);        // Fire events in order received
        //  this.ChangingThing(Thing);      // Fire events in order received
        //  this.RemovingThing(Thing);      // Fire events in order received

        BulkChangesEnd();   // Raise Event to notify GUI to allow control refresh
    }

説明されているタスクを考えると、一般的なクラスコードで何を調整する必要があり、RXステートメントに何を埋め込む必要があるのか​​わかりません。

また、3つのイベントを1つのイベントに結合し、イベントの目的を示す列挙を使用すると、作業が大幅に簡素化されるという事実にも感謝しますが、実際のアプリケーションにキャッシュするイベントは3つだけではありません。各イベントを識別するために一定のSwitchステートメントテストを使用する必要があるというオーバーヘッドは必要ありません。潜在的に多数のGUIインターフェースにルーティングされる大量のイベントがあります。

提案をありがとう。

4

2 に答える 2

5

ああ、私はRxで遊ぶのが大好きです...これが1つのアプローチです。最初に部分的に、次に全体:

編集:コメントに合うように変更

まず、イベントのストリームを設定する必要があります。その間、ブリッジが公開する「古い」.NETイベントパターンをRxの「新しいhawtness」に置き換えましょう。

public Subject<Thing> BufferedAdds {get; private set;}
public Subject<Thing> BufferedChanges {get; private set;}
public Subject<Thing> BufferedRemoves {get; private set;}

_adds = Observable.FromEvent<EventDelegateAdd, Thing>(
    ev => new EventDelegateAdd(ev),
    h => mSource.AddingThing += h,
    h => mSource.AddingThing -= h);
BufferedAdds = new Subject<Thing>();

_changes = Observable.FromEvent<EventDelegateChange, Thing>(
    ev => new EventDelegateChange(ev),
    h => mSource.ChangingThing += h,
    h => mSource.ChangingThing -= h);
BufferedChanges = new Subject<Thing>();

_removes = Observable.FromEvent<EventDelegateRemove, Thing>(
    ev => new EventDelegateRemove(ev),
    h => mSource.RemovingThing += h,
    h => mSource.RemovingThing -= h);
BufferedRemoves = new Subject<Thing>();

また、すべてを今に結び付けるつもりですIScheduler-IScheduler使用法は、あちこちで行うことなく、この血なまぐさいものをテストするための重要な側面ですThread.Sleep-私はこのトピックを研究することを強くお勧めします!

public MyEventCachingBridge(
    MyCustomDataSource source, 
    int eventRelayInterval, 
    IScheduler scheduler)
{

次に、すべての着信イベントを1つのストリームに結合し、時間枠に基づいてそのストリームを「チャンク」する必要があります。Bufferオペレーターはこれに最適です。

_buffer = Observable.Merge(scheduler,
        _adds.Select(e => Tuple.Create(e, ThingEventType.Add)), 
        _changes.Select(e => Tuple.Create(e, ThingEventType.Change)), 
        _removes.Select(e => Tuple.Create(e, ThingEventType.Remove)))
    .Buffer(TimeSpan.FromMilliseconds(eventRelayInterval), scheduler);

イベントのタイプをストリームにパックバックしていることに注意してください。これは、再生中に適切なアクションを実行できるようにするためです。列挙型は次のとおりです。

private enum ThingEventType
{
    Add,
    Change,
    Remove
}

したがって、バッチ処理されたイベントをリッスンして保持するための何かが必要になります。ここにはいくつかのオプションがありますが、List同期を使用した単純なものを使用しましょう。

private Queue<IList<Tuple<Thing,ThingEventType>>> _eventQueue;
private static object SyncRoot = new object();

_eventQueue = new Queue<IList<Tuple<Thing,ThingEventType>>>();

// A serial disposable is a sort of "Disposable holder" - when you change it's
// Disposable member, it auto-disposes what you originally had there...no real
// need for it here, but potentially useful later
_watcherDisposable = new SerialDisposable();
_watcherDisposable.Disposable = _buffer
    .ObserveOn(_scheduler)
    .Subscribe(batch => 
    { 
        lock(SyncRoot) { _eventQueue.Enqueue(batch); }
    });
    _disposables.Add(_watcherDisposable);

eventRelayIntervalまた、再生バーストを「自動配線」して、ミリ秒ごとにパルスを送りましょう。

var pulse = Observable.Interval(
       TimeSpan.FromMilliseconds(eventRelayInterval), 
       _scheduler);
_disposables.Add(pulse
      .ObserveOn(_scheduler)
      .Subscribe(x => PlayBackCachedEvents()));

サブスクリプションは常にIDisposableであり、それらを破棄する必要あるので、そのためにいくつかのものを追加しましょう。

public class MyEventCachingBridge : IDisposable
{
    CompositeDisposable _disposables;

    public void Dispose()
    {
        _disposables.Dispose();
    }

そして今、再生のために:

public void PlayBackCachedEvents()
{
    BulkChangesStart();   // Raise Event to notify GUI to suspend screen updates

    // Play back the list of events to push changes 
    lock(SyncRoot)
    {
        foreach(var batch in _eventQueue)
        {
            // Play back the list of events to push changes to ListView, TreeView, graphs, etc...            
            foreach(var evt in batch)
            {
                switch(evt.Item2)
                {
                    case ThingEventType.Add: BufferedAdds.OnNext(evt.Item1); break;
                    case ThingEventType.Change: BufferedChanges.OnNext(evt.Item1);break;
                    case ThingEventType.Remove: BufferedRemoves.OnNext(evt.Item1);break;
                }
            }
        }
        _eventQueue.Clear();
    }
    BulkChangesEnd();   // Raise Event to notify GUI to allow control refresh
}

今-私たちはすべて消費者側でも派手になりたいので、UIウィンドウをモックアップしましょう(これはWPFであり、それに応じて調整します):

public class BridgeConsumer : Window, IDisposable
{
    private readonly CompositeDisposable _disposables;
    private IScheduler _scheduler;
    private StackPanel _panel;

    public void OnLoaded(object sender, RoutedEventArgs ea)
    {
        _panel = new StackPanel();
        this.Content = _panel;
    }

    public BridgeConsumer(MyEventCachingBridge bridge, IScheduler scheduler)
    {
        // for cleanup of any subscriptions
        _disposables = new CompositeDisposable();
        _disposables.Add(bridge);
        _scheduler = scheduler;

        Loaded += OnLoaded;

        // setup a listener for the bulk start/end events on the bridge
        var bulkStart = Observable.FromEvent(
                h => bridge.BulkChangesStart += new EventDelegateBulkChangesStart(h),
                h => bridge.BulkChangesStart -= new EventDelegateBulkChangesStart(h));
        var bulkEnd = Observable.FromEvent(
                h => bridge.BulkChangesEnd += new EventDelegateBulkChangesEnd(h),
                h => bridge.BulkChangesEnd -= new EventDelegateBulkChangesEnd(h));

        // the "meaty bit" - 
        //    1. create a "window" defined by bulk start/end events
        //    2. inside that "window", trap any occurrences on a 
        //          merged view of adds/changes/removes
        //    3. foreach event in that window, select that event
        //         (i.e., give us window contents as a stream of sorts)
        var bridgeWatcher =
            from thingEventWindow in 
                Observable.Merge(
                    bridge.BufferedAdds.Select(t => Tuple.Create("add", t)), 
                    bridge.BufferedChanges.Select(t => Tuple.Create("change", t)), 
                    bridge.BufferedRemoves.Select(t => Tuple.Create("remove", t))
                )
                .Window(bulkStart, start => bulkEnd)
            from thingEvent in thingEventWindow
            select thingEvent;

        // this could just as easily be a method, a bound call to the viewmodel, etc
        Action<Thing, string, DateTimeOffset> addToList = (thing, msg, ts) => 
        {
            var text = new TextBlock() 
            { 
                Text = string.Format(
                    "At:{0} Key:{2} Msg:{3} - nowTime = {1}", 
                    thing.TimeStamp, 
                    ts, 
                    thing.Key, 
                    msg) };
            _panel.Children.Add(text);
        };

        _disposables.Add(bridgeWatcher
            // CAREFUL! "ObserveOn" means what you'd think "SubscribeOn" would
            .ObserveOnDispatcher()
            .Subscribe(tup => 
            {
                addToList(tup.Item2, tup.Item1, _scheduler.Now);
            }));
    }

    public void Dispose()
    {
        // clean up
        if(_disposables != null) _disposables.Dispose();
    }
}

シバン全体:

void Main()
{
    var scheduler = Scheduler.Default;
    var rnd = new Random();
    var canceller = new CancellationTokenSource();

    var source = new MyCustomDataSource();    
    var eventRelayInterval = 2000;
    var bridge = new MyEventCachingBridge(source, eventRelayInterval, scheduler);

    var window = new BridgeConsumer(bridge);
    window.Closed += (o,e) => { canceller.Cancel(); window.Dispose(); };
    window.Show();

    Task.Factory.StartNew(
        () => 
        {
            while(true)
            {
                var thing = new Thing() 
                { 
                    Key = "added thing " + rnd.Next(0, 100), 
                    Title = "title for added thing", 
                    TimeStamp = scheduler.Now.DateTime 
                };
                source.FireAdd(thing);
                Thread.Sleep(rnd.Next(1,10) * 100);
            }
        }, canceller.Token);            
}

public class BridgeConsumer : Window, IDisposable
{
    private readonly CompositeDisposable _disposables;
    private StackPanel _panel;

    public void OnLoaded(object sender, RoutedEventArgs ea)
    {
        _panel = new StackPanel();
        this.Content = _panel;
    }

    public BridgeConsumer(MyEventCachingBridge bridge)
    {
        _disposables = new CompositeDisposable();
        _disposables.Add(bridge);

        Loaded += OnLoaded;

        var bulkStart = Observable.FromEvent(
                h => bridge.BulkChangesStart += new EventDelegateBulkChangesStart(h),
                h => bridge.BulkChangesStart -= new EventDelegateBulkChangesStart(h));
        var bulkEnd = Observable.FromEvent(
                h => bridge.BulkChangesEnd += new EventDelegateBulkChangesEnd(h),
                h => bridge.BulkChangesEnd -= new EventDelegateBulkChangesEnd(h));
        var bridgeWatcher =
            from thingEventWindow in 
                Observable.Merge(
                    bridge.BufferedAdds.Select(t => Tuple.Create("add", t)), 
                    bridge.BufferedChanges.Select(t => Tuple.Create("change", t)), 
                    bridge.BufferedRemoves.Select(t => Tuple.Create("remove", t))
                )
                .Window(bulkStart, start => bulkEnd)
            from thingEvent in thingEventWindow
            select thingEvent;

        Action<Thing, string> addToList = (thing, msg) => 
        {
            var text = new TextBlock() 
            { 
                Text = string.Format(
                    "At:{0} Key:{1} Msg:{2}", 
                    thing.TimeStamp, 
                    thing.Key, 
                    msg) 
            };
            _panel.Children.Add(text);
        };

        _disposables.Add(bridgeWatcher.ObserveOnDispatcher().Subscribe(tup => 
            {
                addToList(tup.Item2, tup.Item1);
            }));
    }

    public void Dispose()
    {
        if(_disposables != null) _disposables.Dispose();
    }
}


public delegate void EventDelegateAdd(Thing thing);
public delegate void EventDelegateChange(Thing thing);
public delegate void EventDelegateRemove(Thing thing);

public delegate void EventDelegateBulkChangesStart();
public delegate void EventDelegateBulkChangesEnd();

// The "Things" that are stored in MyCustomDataSource

public class Thing
{
    public DateTime TimeStamp {get; set;}
    public string Key { get; set; }
    public string Title { get; set; }
    public object OtherStuff { get; set; }
    public override string ToString()
    {
        return string.Format("At:{0} Key:{1} Title:{2}", this.TimeStamp, this.Key, this.Title);        
    }
}

// A custom observable data source with events that indicate when Things are
// added, changed, or removed.

public class MyCustomDataSource
{
    public event EventDelegateAdd AddingThing = delegate { };
    public event EventDelegateChange ChangingThing = delegate { };
    public event EventDelegateRemove RemovingThing = delegate { };

    // The rest of the class that manages the database of Things...
    public void FireAdd(Thing toAdd)
    {
        AddingThing(toAdd);
    }
    public void FireChange(Thing toChange)
    {
        ChangingThing(toChange);
    }
    public void FireRemove(Thing toRemove)
    {
        RemovingThing(toRemove);
    }
}

// This class forms a configurable event bridge between the MyCustomDataSource and
// the GUI.  The goal is to cache, filter, and throttle the events so that the GUI
// updates only occasionally with bulk changes that are relevant for that control.

public class MyEventCachingBridge : IDisposable
{
    private enum ThingEventType
    {
        Add,
        Change,
        Remove
    }

    private MyCustomDataSource mSource;
    private IScheduler _scheduler;

    public event EventDelegateBulkChangesStart BulkChangesStart = delegate { };
    public event EventDelegateBulkChangesEnd BulkChangesEnd = delegate { };

    public IObservable<Thing> RawAdds {get; private set;}
    public IObservable<Thing> RawChanges {get; private set;}
    public IObservable<Thing> RawRemoves {get; private set;}

    public Subject<Thing> BufferedAdds {get; private set;}
    public Subject<Thing> BufferedChanges {get; private set;}
    public Subject<Thing> BufferedRemoves {get; private set;}

    private readonly IObservable<IList<Tuple<Thing,ThingEventType>>> _buffer;
    private List<IList<Tuple<Thing,ThingEventType>>> _eventQueue;
    private static object SyncRoot = new object();

    private readonly CompositeDisposable _disposables;
    private readonly SerialDisposable _watcherDisposable;

    public MyEventCachingBridge(MyCustomDataSource source, int eventRelayInterval, IScheduler scheduler)
    {
        _disposables = new CompositeDisposable();
        mSource = source;
        _scheduler = scheduler;
        _eventQueue = new List<IList<Tuple<Thing,ThingEventType>>>();

        // Magical Reactive Extensions code goes here that subscribes to all 3 events...
        //
        //   mSource.AddingThing
        //   mSource.ChangingThing
        //   mSource.RemovingThing
        // 
        //  ...filters and records a list of the events as they are received ( maintaining order of events too ),
        //  then every eventRelayInterval milliseconds, plays back the events in bulk to update the GUI 
        //  ( on the GUIs thread ).  Note that LINQ will be used to filter the Things so that a subset of
        //  Thing changes are relayed to the GUI - i.e. - not all Thing events are observed by the GUI.
        RawAdds = Observable.FromEvent<EventDelegateAdd, Thing>(
            ev => new EventDelegateAdd(ev),
            h => mSource.AddingThing += h,
            h => mSource.AddingThing -= h);
        BufferedAdds = new Subject<Thing>();

        RawChanges = Observable.FromEvent<EventDelegateChange, Thing>(
            ev => new EventDelegateChange(ev),
            h => mSource.ChangingThing += h,
            h => mSource.ChangingThing -= h);
        BufferedChanges = new Subject<Thing>();

        RawRemoves = Observable.FromEvent<EventDelegateRemove, Thing>(
            ev => new EventDelegateRemove(ev),
            h => mSource.RemovingThing += h,
            h => mSource.RemovingThing -= h);
        BufferedRemoves = new Subject<Thing>();

        _buffer = Observable.Merge(
                    _scheduler,
                    RawAdds.Select(e => Tuple.Create(e, ThingEventType.Add)), 
                    RawChanges.Select(e => Tuple.Create(e, ThingEventType.Change)), 
                    RawRemoves.Select(e => Tuple.Create(e, ThingEventType.Remove)))
            .Buffer(TimeSpan.FromMilliseconds(eventRelayInterval), _scheduler);

        _watcherDisposable = new SerialDisposable();
        _watcherDisposable.Disposable = _buffer
            .ObserveOn(_scheduler)
            .Subscribe(batch => 
            { 
                lock(SyncRoot) { _eventQueue.Add(batch); }
            });
        _disposables.Add(_watcherDisposable);

        var pulse = Observable.Interval(TimeSpan.FromMilliseconds(eventRelayInterval), _scheduler);
        _disposables.Add(pulse.ObserveOn(_scheduler).Subscribe(x => PlayBackCachedEvents()));
    }

    private void PlayBackCachedEvents()
    {
        BulkChangesStart();   // Raise Event to notify GUI to suspend screen updates

        try
        {            
            //_eventQueue.Dump();
            lock(SyncRoot)
            {
                foreach(var batch in _eventQueue)
                {
                    // Play back the list of events to push changes to ListView, TreeView, graphs, etc...            
                    foreach(var evt in batch)
                    {
                        switch(evt.Item2)
                        {
                            case ThingEventType.Add: BufferedAdds.OnNext(evt.Item1); break;
                            case ThingEventType.Change: BufferedChanges.OnNext(evt.Item1);break;
                            case ThingEventType.Remove: BufferedRemoves.OnNext(evt.Item1);break;
                        }
                    }
                }
                _eventQueue.Clear();
            }
        }
        catch(Exception ex)
        {
            Console.WriteLine("Exception during playback:" + ex);
        }
        BulkChangesEnd();   // Raise Event to notify GUI to allow control refresh
    }

    public void Dispose()
    {
        _disposables.Dispose();
    }
}
于 2013-01-12T20:33:54.367 に答える
2

私はこの特定のショーに少し遅れていますが、あなたが必要としているのは、すべての追加、更新、削除の変更セットを提供する監視可能なキャッシュ/リストだと思います。

私はこれを実装し、オープンソースにしました:

ブログ:http ://dynamicdataproject.wordpress.com/

デモ:https ://github.com/RolandPheasant/TradingDemo

于 2014-11-25T14:41:01.193 に答える