3

次のインターフェイスを持つクラスのグループがあります。

public interface RoutedEventReceiver<T>
{
    IDisposable Apply(IObservable<T> stream);
    bool ShouldForwardEvent(T anEvent);
}

私がやりたいことは、これらのクラスのスタックを維持し、各イベントがShouldForwardEvent(T)述語を介してフィルター処理され、結果IObservable<T>が次のレシーバーに渡されることです。また、プログラムの実行中に新しいレシーバーをプッシュおよびポップできるようにしたいと考えています (ある時点で、スタックから他のコレクションに移動したい場合がありますが、今のところスタックで十分です)。

私が現在持っているものは機能しますが、それが非常に「Rx」であるようには感じません。この命令的なロジックをすべて使わずに、やりたいことを行う方法があるに違いないと確信しています。

private void Refresh()
{
    // _subscriptions is a list of previous subscriptions
    foreach (var subscription in _subscriptions)
        subscription.Dispose();
    _subscriptions.Clear();

    // _stream is my stream of incoming events
    if (_stream != null)
    {
        var stream = _stream;

        foreach (var eventReceiver in _eventReceivers)
        {
            // add the subscription so it can be disposed next Refresh()
            _subscriptions.Add(eventReceiver.Apply(stream));

            // filter the stream for the next event receiver
            stream = stream.Where(eventReceiver.ShouldForwardEvent);
        }
    }
}

上記のメソッドは、IPushまたはPopスタック上でいつでも呼び出されます。

上記の意図を表現するための、よりクリーンで機能的な方法はありますか? 私は試してみ.Publish()ましたが、ほとんど成功しませんでした - おそらく私はそれを十分に知りません.

4

3 に答える 3

0

私はPublishアプローチを機能させることができましたが、次のリストを保持する必要性を取り除く以外に、私にはあまり余裕がありませんIDisposables:

private void Refresh()
{
    _published.DisposeIfNotNull();

    if (_stream != null)
    {
        var connectable = _stream.Publish();
        _published = connectable.Connect();
        var stream = connectable.AsObservable();

        foreach (var eventReceiver in _eventReceivers)
        {
            eventReceiver.Apply(stream);
            stream = stream.Where(eventReceiver.ShouldForwardEvent);
        }
    }
}
于 2013-06-23T17:05:17.597 に答える
0

以下のクラス (Chain Of Responsibility* Stack の CORStack という名前) は、あなたが求めていることを実行しようとします。内部的には、ShouldHandle bool をストリームに追加し、これを使用して処理するかどうかを決定します。Push標準の 、Pop、およびPeekメソッドを公開します。

public sealed class CORStack<T>
{
    Stack<StackFrame> _handlers;

    public CORStack(IObservable<T> source)
    {
        _handlers = new Stack<StackFrame>();
        _handlers.Push(new StackFrame(
            source.Select(t => new ShouldHandleWrapper(t, true)),
            new Handler<T>(new Action<T>(t => { }), true)));
    }

    public void Push(Handler<T> handler)
    {
        _handlers.Push(new StackFrame(_handlers.Peek().Observable, handler));
    }

    public Handler<T> Peek()
    {
        return _handlers.Peek().Handler;
    }

    public Handler<T> Pop()
    {
        var frame = _handlers.Pop();
        frame.Dispose();
        return frame.Handler;
    }

    class StackFrame : IDisposable
    {
        IDisposable _unsub;

        public IObservable<ShouldHandleWrapper> Observable { get; private set; }
        public Handler<T> Handler { get; private set; }

        public StackFrame(IObservable<ShouldHandleWrapper> topOfStack, Handler<T> handler)
        {
            _unsub = topOfStack.Subscribe(shouldHandle =>
                {
                    if (shouldHandle.ShouldHandle)
                        handler.Action.Invoke(shouldHandle.Value);
                });
            Observable = topOfStack.Select(shouldHandle =>
                new ShouldHandleWrapper(shouldHandle.Value, shouldHandle.ShouldHandle && handler.Forward));
            Handler = handler;
        }

        public void Dispose()
        {
            _unsub.Dispose();
        }
    }

    class ShouldHandleWrapper
    {
        public readonly T Value;
        public readonly bool ShouldHandle;

        public ShouldHandleWrapper(T value, bool shouldHandle)
        {
            Value = value;
            ShouldHandle = shouldHandle;
        }
    }
}

public class Handler<T>
{
    public Action<T> Action { get; set; }
    public bool Forward { get; set; }

    public Handler(Action<T> action, bool forward)
    {
        Action = action;
        Forward = forward;
    }
}

*責任の連鎖ではないことに気づきましたが、より良い名前のATMが思いつきません。

于 2013-06-24T01:38:54.423 に答える