1

「クライアント」の複数のインスタンスに接続された「ホスト」の単一のインスタンスを中心に構築された単純なメッセージングフレームワークがあります。各インスタンスは、メッセージをプッシュすることで他のインスタンスと通信します。次に、インスタンスは、別のタスクを使用して受信した順序で独自のメッセージを処理します。

コードの簡略化されたバージョンは次のとおりです。

interface IMessage
{
}

class Host
{
    ConcurrentBag<Client> _Clients = new ConcurrentBag<Client>();
    BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();

    public Host()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var message in _IncomingMessages.GetConsumingEnumerable())
            {
                ProcessIncomingMessage(message);
            }
        });
    }

    public void AddClient(Client client)
    {
        _Clients.Add(client);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        // consume the message and typically generate new messages
        // For now just echo the message back
        Broadcast(message);  
    }

    private void Broadcast(IMessage message)
    {
        foreach (var client in _Clients)
        {
            client.PushMessage(message);
        }
    }
}


class Client
{
    private Host _Host;
    private BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();

    public Client(Host host)
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var message in _IncomingMessages.GetConsumingEnumerable())
            {
                ProcessIncomingMessage(message);
            }
        });

        _Host = host;
        _Host.AddClient(this);
    }

    public void PushMessage(IMessage message)
    {
        _IncomingMessages.Add(message);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        // interpret the message and update state
    }
}

私の要件は、メッセージを処理する各インスタンスが(可能であれば)並列実行を利用し、各メッセージがクラスごとに一度に1つずつ受信された順序で処理されることです。

GetConsumingEnumerable()からプルするだけの新しいタスクを生成するのは悪い考えですか?

リアクティブ拡張機能とIObserveableを使用してこれを単純化できるように感じますが、Rxの使用にはまだかなり慣れておらず、並列実行のニーズを満たすためにRxをどのように構成するかがわかりません。

どんなガイダンスでも大歓迎です!

4

1 に答える 1

3

にメッセージを渡し、からすべてのクライアントにメッセージIObservableを渡すための仲介者としてRx(具体的には)を使用することで、確かにあなたの生活をはるかに楽にすることができます。コードへの2つの変更(どちらか一方、または両方を実行できます)は、入力としてtoを渡し、それを生成させることと、同様にtoを渡すことです。HostHostIObservable<IMessage>HostIObservable<IMessage>IObservable<IMessage>Client

私が見ることができることから、そしてお互いについて知る必要はHostありません(これは素晴らしいです)。がクライアントを積極的に管理する必要がある場合(たとえば、クライアントをキックオフするClient場合)、これは機能しません-これらの場合、以下のコードを変更してクライアントサブスクリプションを管理する必要があります。これは重要ではありませんが、サブスクリプションを要求するクライアントが含まれることは確かです。ホストから、および結果を保持しているホストから。HostHostIDisposable

また、以下は一般的にエラーやサブスクリプション解除を処理していません。これらはそれほど問題なく追加できます。それはあなたと同等の機能を備えたスケルトンです。

class Host
{
    private Subject<IMessage> _outbound;

    public Host(IObservable<IMessage> messages)
    {
        _outbound = new Subject<IMessage>();        
        messages.SubscribeOn(Scheduler.TaskPool).Subscribe(ProcessIncomingMessage);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        _outbound.OnNext(message); // just echo
    }

    public IObservable<IMessage> Messages { get { return _outbound.AsObservable(); } }
}

class Client
{
    public Client(IObservable<IMessage> messages)
    {
        messages.SubscribeOn(Scheduler.TaskPool).Subscribe(ProcessIncomingMessage);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        // interpret the message and update state
    }
}

次に、使用法は簡単です。

var source = new Subject<long>();

var host = new Host(source);
var client1 = new Client(host.Messages);
var client2 = new Client(host.Messages);
var client3 = new Client(host.Messages);

source.OnNext(1); // assuming each of these is an IMessage
source.OnNext(2);
source.OnNext(3);

もちろん、任意のIObservableソースを使用してを駆動できますHost

メッセージの処理が「長時間実行」である場合Scheduler.NewThreadは、SubscribeOn呼び出しで使用することに注意してください。それ以外の場合、各メッセージは現在、Task Pool処理対象に追加されています。

于 2012-07-11T21:26:42.720 に答える