何千ものクライアントが接続できるソケットアプリケーションがあります。それらをに格納し、ConcurrentDictionary<int, Socket>
要求/応答の状況でのみ動作します。
- データが必要な場合は、関連するソケットを見つけてリクエストを送信し、必要なデータを要求します。
- リクエストを送信した後、レスポンスを送信するまでバイトを受信します。それから私は受け取りをやめます。
このような:
public Task<Message> Request(int clientId, Message message)
{
Socket client;
return Clients.TryGetValue(clientId, out client)
? RequestInternal(client, message);
: _EmptyTask;
}
public async Task<Message> RequestInternal(Socket client, Message message)
{
await SendAsync(client, message).ConfigureAwait(false);
return await ReceiveOneAsync(client).ConfigureAwait(false);
}
次に、このアプリケーションを変更して、クライアントがいつでも何でも送信できるようにする必要があります。リクエストしなくても。これは、ソケットから常に受信する必要があり、まったく異なるアプローチが必要になると思います。
質問:
- この種のアプリケーションに対する賭け-既知のアプローチ(ベストプラクティス)は何ですか?
- あなたが私に話すことができる落とし穴やあなたが私に指摘することができるガイドはありますか?
私が念頭に置いていること:
免責事項:この部分は少し長く、完全に架空のものです。上記の質問に対する答えがあれば、それをスキップできます。
私が念頭に置いていること:
- バイトを絶えず受信し、アセンブルされたPDUをに追加します
BlockingCollection<Message>
。 BlockingCollection
のGetConsumingEnumerable
メソッドを使用して受信メッセージを処理するための専用のスレッドを作成します。
処理スレッドはこれを行います:
foreach (var message in Messages.GetConsumingEnumerable())
ProcessMessage(message);
これにより、クライアントが送信するすべてのものを受信して処理できますが、要求に応答するために送信されるメッセージと、クライアントが問題になる必要があるために送信されるメッセージを区別します。
リクエストで一意の識別子バイト(その特定のクライアントに固有)を送信できると思います。次に、クライアントはその識別子を応答で私に送り返すことができ、私はそれを使用して応答を区別することができます。
ProcessMessage(Message msg)
{
// msg is a message from msg.Sender.
if (msg.Id == 0)
{
// msg is not a response, do processing.
}
else
{
// msg is a response to the message that's sent with msg.Id.
// Find the request that:
// * ...is made to msg.Sender
// * ...and has the msg.Id as identifier.
// And process the response according to that.
}
}
これは、リクエストも保存する必要があることを意味します。架空のバージョンは次のRequestInternal
とおり
です。編集: StephenClearyの回答の後にWait
呼び出しをsに置き換えました。await
private async Task RequestInternal(Socket client, Message message)
{
var request = new Request(client, message);
Requests.Add(request);
await SendAsync(client, message).ConfigureAwait(false);
return await request.Source.Task.ConfigureAwait(false);
}
そしてRequest
クラス:
private sealed class Request
{
public readonly byte Id;
public readonly Socket Client;
public readonly Message Message;
public readonly TaskCompletionSource<Message> Source;
public Request(Socket client, Message message)
{
Client = client;
Message = message;
Source = new TaskCompletionSource<Message>();
// Obtain a byte unique to that socket...
Id = GetId(client);
}
}
そしてProcessMessage
これになります:
ProcessMessage(Message msg)
{
if (msg.Id == 0)
OnReceived(msg); // To raise an event.
else
{
// Method to find a request using msg.Sender and msg.Id
var request = Requests.Find(msg);
if (request != null)
request.Source.SetResult(msg);
}
}
どんなコレクションタイプRequests
になるのかわかりませんが。
編集:ConcurrentDictionary<Key, Request>
where isは、 (ソケットのID)フィールドと(メッセージのID)フィールドKey
を持つプライベート構造体を使用しました。また、を実装します。Int32
Byte
IEquatable<T>