2

メソッドに次のようなコードがあります。

ISubject<Message> messages = new ReplaySubject<Message>(messageTimeout);

public void HandleNext(string clientId, Action<object> callback)
{
    messages.Where(message => !message.IsHandledBy(clientId))
            .Take(1)
            .Subscribe(message =>
                       {
                           callback(message.Message);
                           message.MarkAsHandledBy(clientId);
                       });
}

への複数の同時呼び出しでと の間MarkAsHandledBy()で競合が発生しないように、それをコーディングする rx'y の方法は何ですか?IsHandledBy()HandleNext()

編集:

これはロングポーリング用です。HandleNext()Web リクエストごとに呼び出されます。リクエストは 1 つのメッセージしか処理できず、クライアントに返されます。次のリクエストは次のメッセージを受け取ります。

完全なコード (もちろんまだ進行中の作業です) は次のとおりです。

public class Queue
{
    readonly ISubject<MessageWrapper> messages;

    public Queue() : this(TimeSpan.FromSeconds(30)) {}

    public Queue(TimeSpan messageTimeout)
    {
        messages = new ReplaySubject<MessageWrapper>(messageTimeout);
    }

    public void Send(string channel, object message)
    {
        messages.OnNext(new MessageWrapper(new List<string> {channel}, message));
    }

    public void ReceiveNext(string clientId, string channel, Action<object> callback)
    {
        messages
            .Where(message => message.Channels.Contains(channel) && !message.IsReceivedBy(clientId))
            .Take(1)
            .Subscribe(message =>
                       {
                           callback(message.Message);
                           message.MarkAsReceivedFor(clientId);
                       });
    }

    class MessageWrapper
    {
        readonly List<string> receivers;

        public MessageWrapper(List<string> channels, object message)
        {
            receivers = new List<string>();
            Channels = channels;
            Message = message;
        }

        public List<string> Channels { get; private set; }
        public object Message { get; private set; }

        public void MarkAsReceivedFor(string clientId)
        {
            receivers.Add(clientId);
        }

        public bool IsReceivedBy(string clientId)
        {
            return receivers.Contains(clientId);
        }
    }
}

編集2:

現在、私のコードは次のようになっています。

public void ReceiveNext(string clientId, string channel, Action<object> callback)
{
    var subscription = Disposable.Empty;
    subscription = messages
        .Where(message => message.Channels.Contains(channel))
        .Subscribe(message =>
                   {
                       if (message.TryDispatchTo(clientId, callback))
                           subscription.Dispose();
                   });
}

class MessageWrapper
{
    readonly object message;
    readonly List<string> receivers;

    public MessageWrapper(List<string> channels, object message)
    {
        this.message = message;
        receivers = new List<string>();
        Channels = channels;
    }

    public List<string> Channels { get; private set; }

    public bool TryDispatchTo(string clientId, Action<object> handler)
    {
        lock (receivers)
        {
            if (IsReceivedBy(clientId)) return false;
            handler(message);
            MarkAsReceivedFor(clientId);
            return true;
        }
    }

    void MarkAsReceivedFor(string clientId)
    {
        receivers.Add(clientId);
    }

    bool IsReceivedBy(string clientId)
    {
        return receivers.Contains(clientId);
    }
}
4

2 に答える 2

2

Rx の悪夢を自分で作っているように思えます。Rx は、サブスクライバーをメッセージに関連付ける非常に簡単な方法を提供する必要があります。

私はあなたがあなたを保持する自己完結型のクラスを持っているという事実が好きReplaySubjectですOnCompleted.

ただし、このReceiveNext方法ではサブスクライバーを削除する方法は提供されません。少なくともメモリリークです。でクライアント ID を追跡すると、MessageWrapperメモリ リークが発生する可能性もあります。

ではなく、この種の関数を使用することをお勧めしますReceiveNext

public IDisposable RegisterChannel(string channel, Action<object> callback)
{
    return messages
        .Where(message => message.Channels.Contains(channel))
        .Subscribe(message => callback(message.Message));
}

とてもRxっぽいです。これは素晴らしい純粋なクエリであり、簡単に登録解除できます。

Action<object> callbackは間違いなく に直接関係しているので、clientIdメッセージ処理の重複を防ぐロジックをそこに入れることを考えます。

現在、コードは非常に手続き的であり、Rx には適していません。どうすれば Rx を最適に操作できるか、まだ頭に入っていないようです。これは良いスタートですが、(関数型プログラミングのように) もっと関数的に考える必要があります。


コードをそのまま使用する必要がある場合は、いくつかの変更をお勧めします。

これQueueを行うには:

public IDisposable ReceiveNext(
    string clientId, string channel, Action<object> callback)
{
    return
        messages
            .Where(message => message.Channels.Contains(channel))
            .Take(1)
            .Subscribe(message =>
                message.TryReceive(clientId, callback));
}

そして&MessageWrapperを取り除き、代わりにこれを行います:MarkAsReceivedForIsReceivedBy

    public bool TryReceive(string clientId, Action<object> callback)
    {
        lock (receivers)
        {
            if (!receivers.Contains(clientId))
            {
                callback(this.Message);
                receivers.Add(clientId);
                return true;
            }
            else
                return false;
        }
    }

ただし、なぜそうなのかはわかりません.Take(1)が、これらの変更により、原因によっては競合状態が軽減される場合があります。

于 2012-09-11T03:20:00.810 に答える
1

このようにRxを採用することが良い習慣であるかどうかはわかりません。Rx は、同時通知がないことを必要とするストリームの概念を定義します。

とはいえ、あなたの質問に答えるには、競合状態を避けるために、IsReceivedByandMarkAsReceivedForメソッド内にロックを入れてください。

より良いアプローチとしては、処理業務全体を放棄し、リクエストの受信時にConcurrentQueueTryDequeueメッセージを使用することができます (実行しているだけですTake(1)- これはキュー モデルに適合します)。Rx は、各メッセージに TTL を与えてキューから削除するのに役立ちますが、着信要求でそれを行うこともできます。

于 2012-09-10T20:42:33.137 に答える