2

観察可能なメッセージのシーケンスが1つあります。これらのメッセージを処理できるサブスクライバーのセットがあります。各サブスクライバーには実行優先順位があります。各メッセージは、現在サブスクライブされているサブスクライバーのリストから選択された最も優先度の高いサブスクライバーによって1回処理される必要があります。サブスクライバーは常にシーケンスからサブスクライブ/サブスクライブ解除されているため、シーケンスを作成する際のサブスクライバーの数と優先順位はわかりません。rxを使用した可能性のある/実行可能なソリューションですか?

説明する:

public class Message
{
    public string Value { get; set; }
    public bool IsConsumed { get; set; }
}

var subject = new Subject<Message>();
var sequence = subject.Publish().RefCount();

Action<Message, int> subscriber = (m, priority) =>
{
    if (!m.IsConsumed)
    {
        m.IsConsumed = true;
        Trace.WriteLine(priority);
    }
};

var s2 = sequence.Priority(2).Subscribe(m => subscriber(m, 2));
var s1 = sequence.Priority(1).Subscribe(m => subscriber(m, 1));

subject.OnNext(new Message()); // output: 1

s1.Dispose();
subject.OnNext(new Message()); // output: 2

このソリューションを機能させるために欠けている部分は、Rxライブラリに存在しないPriorityメソッドです。

4

1 に答える 1

2

これは非常に興味深い問題でした...

Priorityしたがって、最初に、この拡張機能で必要なものと同様の「ルーティング」効果を実現できる組み込みのRx演算子を認識していません。

そうは言っても、私は今日昼食時にLINQPadで遊んでいて、機能しているように見える(非常に)ハッキーな概念実証を思いついた。

まず、メッセージクラス

public class Message
{
    public string Value { get; set; }
    public bool IsConsumed { get; set; }
}

次に、拡張メソッドのラッパークラス:

public static class Ext
{    
    public static PrioritizedObservable<T> Prioritize<T>(this IObservable<T> source)
    {
        return new PrioritizedObservable<T>(source);
    }
}

そして、これは何PrioritizedObservable<T>ですか?

public class PrioritizedObservable<T> 
   : IObservable<T>, IObserver<T>, IDisposable
{
    private IObservable<T> _source;
    private ISubject<T,T> _intermediary;
    private IList<Tuple<int, Subject<T>>> _router;

    public PrioritizedObservable(IObservable<T> source)
    {
        // Make sure we don't accidentally duplicate subscriptions
        // to the underlying source
        _source = source.Publish().RefCount();

        // A proxy from the source to our internal router
        _intermediary = Subject.Create(this, _source);
        _source.Subscribe(_intermediary);        

        // Holds per-priority subjects
        _router = new List<Tuple<int, Subject<T>>>();
    }

    public void Dispose()
    {
        _intermediary = null;
        foreach(var entry in _router)
        {
            entry.Item2.Dispose();
        }
        _router.Clear();
    }

    private ISubject<T,T> GetFirstListener()
    {
        // Fetch the first subject in our router
        // ordered by priority 
        return _router.OrderBy(tup => tup.Item1)
            .Select(tup => tup.Item2)
            .FirstOrDefault();
    }

    void IObserver<T>.OnNext(T value)
    {
        // pass along value to first in line
        var nextListener = GetFirstListener();
        if(nextListener != null)
            nextListener.OnNext(value);
    }

    void IObserver<T>.OnError(Exception error)
    {
        // pass along error to first in line
        var nextListener = GetFirstListener();
        if(nextListener != null)
            nextListener.OnError(error);
    }

    void IObserver<T>.OnCompleted()
    {
        var nextListener = GetFirstListener();
        if(nextListener != null)
            nextListener.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> obs)
    {
        return PrioritySubscribe(1, obs);
    }

    public IDisposable PrioritySubscribe(int priority, IObserver<T> obs)
    {
        var sub = new Subject<T>();
        var subscriber = sub.Subscribe(obs);
        var entry = Tuple.Create(priority, sub);
        _router.Add(entry);
        _intermediary.Subscribe(sub);
        return Disposable.Create(() => 
        {
            subscriber.Dispose();
            _router.Remove(entry);
        });
    }
}

そしてテストハーネス:

void Main()
{
    var subject = new Subject<Message>();
    var sequence = subject.Publish().RefCount().Prioritize();

    Action<Message, int> subscriber = (m, priority) =>
    {
        if (!m.IsConsumed)
        {
            m.IsConsumed = true;
            Console.WriteLine(priority);
        }
    };

    var s3 = sequence.PrioritySubscribe(3, Observer.Create<Message>(m => subscriber(m, 3)));
    var s2 = sequence.PrioritySubscribe(2, Observer.Create<Message>(m => subscriber(m, 2)));
    var s1 = sequence.PrioritySubscribe(1, Observer.Create<Message>(m => subscriber(m, 1)));
    var s11 = sequence.PrioritySubscribe(1, Observer.Create<Message>(m => subscriber(m, 1)));

    subject.OnNext(new Message()); // output: 1

    s1.Dispose();
    subject.OnNext(new Message()); // output: 1
    s11.Dispose();

    subject.OnNext(new Message()); // output: 2
    s2.Dispose();
    subject.OnNext(new Message()); // output: 3

    sequence.Dispose();

}
于 2013-02-26T00:46:08.697 に答える