「クライアント」の複数のインスタンスに接続された「ホスト」の単一のインスタンスを中心に構築された単純なメッセージングフレームワークがあります。各インスタンスは、メッセージをプッシュすることで他のインスタンスと通信します。次に、インスタンスは、別のタスクを使用して受信した順序で独自のメッセージを処理します。
コードの簡略化されたバージョンは次のとおりです。
interface IMessage
{
}
class Host
{
ConcurrentBag<Client> _Clients = new ConcurrentBag<Client>();
BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();
public Host()
{
Task.Factory.StartNew(() =>
{
foreach (var message in _IncomingMessages.GetConsumingEnumerable())
{
ProcessIncomingMessage(message);
}
});
}
public void AddClient(Client client)
{
_Clients.Add(client);
}
private void ProcessIncomingMessage(IMessage message)
{
// consume the message and typically generate new messages
// For now just echo the message back
Broadcast(message);
}
private void Broadcast(IMessage message)
{
foreach (var client in _Clients)
{
client.PushMessage(message);
}
}
}
class Client
{
private Host _Host;
private BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();
public Client(Host host)
{
Task.Factory.StartNew(() =>
{
foreach (var message in _IncomingMessages.GetConsumingEnumerable())
{
ProcessIncomingMessage(message);
}
});
_Host = host;
_Host.AddClient(this);
}
public void PushMessage(IMessage message)
{
_IncomingMessages.Add(message);
}
private void ProcessIncomingMessage(IMessage message)
{
// interpret the message and update state
}
}
私の要件は、メッセージを処理する各インスタンスが(可能であれば)並列実行を利用し、各メッセージがクラスごとに一度に1つずつ受信された順序で処理されることです。
GetConsumingEnumerable()からプルするだけの新しいタスクを生成するのは悪い考えですか?
リアクティブ拡張機能とIObserveableを使用してこれを単純化できるように感じますが、Rxの使用にはまだかなり慣れておらず、並列実行のニーズを満たすためにRxをどのように構成するかがわかりません。
どんなガイダンスでも大歓迎です!