1

何千ものクライアントが接続できるソケットアプリケーションがあります。それらをに格納し、ConcurrentDictionary<int, Socket>要求/応答の状況でのみ動作します。

  • データが必要な場合は、関連するソケットを見つけてリクエストを送信し、必要なデータを要求します。
  • リクエストを送信した後、レスポンスを送信するまでバイトを受信します。それから私は受け取りをやめます。

このような:

public Task<Message> Request(int clientId, Message message)
{
    Socket client;
    return Clients.TryGetValue(clientId, out client)
        ? RequestInternal(client, message);
        : _EmptyTask;
}

public async Task<Message> RequestInternal(Socket client, Message message)
{
    await SendAsync(client, message).ConfigureAwait(false);
    return await ReceiveOneAsync(client).ConfigureAwait(false);
}

次に、このアプリケーションを変更して、クライアントがいつでも何でも送信できるようにする必要があります。リクエストしなくても。これは、ソケットから常に受信する必要があり、まったく異なるアプローチが必要になると思います。

質問:

  1. この種のアプリケーションに対する賭け-既知のアプローチ(ベストプラクティス)は何ですか?
  2. あなたが私に話すことができる落とし穴やあなたが私に指摘することができるガイドはありますか?

私が念頭に置いていること:
免責事項:この部分は少し長く、完全に架空のものです。上記の質問に対する答えがあれば、それをスキップできます。

私が念頭に置いていること:

  • バイトを絶えず受信し、アセンブルされたPDUをに追加しますBlockingCollection<Message>
  • BlockingCollectionGetConsumingEnumerableメソッドを使用して受信メッセージを処理するための専用のスレッドを作成します。

処理スレッドはこれを行います:

foreach (var message in Messages.GetConsumingEnumerable())
    ProcessMessage(message);

これにより、クライアントが送信するすべてのものを受信して​​処理できますが、要求に応答するために送信されるメッセージと、クライアントが問題になる必要があるために送信されるメッセージを区別します。

リクエストで一意の識別子バイト(その特定のクライアントに固有)を送信できると思います。次に、クライアントはその識別子を応答で私に送り返すことができ、私はそれを使用して応答を区別することができます。

ProcessMessage(Message msg)
{
    // msg is a message from msg.Sender.

    if (msg.Id == 0)
    {
        // msg is not a response, do processing.
    }
    else
    {
        // msg is a response to the message that's sent with msg.Id.
        // Find the request that:
        // * ...is made to msg.Sender
        // * ...and has the msg.Id as identifier.
        // And process the response according to that.
    }
}

これは、リクエストも保存する必要があることを意味します。架空のバージョンは次のRequestInternalとおり です。編集: StephenClearyの回答の後にWait呼び出しをsに置き換えました。await

private async Task RequestInternal(Socket client, Message message)
{
    var request = new Request(client, message);
    Requests.Add(request);

    await SendAsync(client, message).ConfigureAwait(false);
    return await request.Source.Task.ConfigureAwait(false);
}

そしてRequestクラス:

private sealed class Request
{
    public readonly byte Id;
    public readonly Socket Client;
    public readonly Message Message;
    public readonly TaskCompletionSource<Message> Source;

    public Request(Socket client, Message message)
    {
        Client = client;
        Message = message;
        Source = new TaskCompletionSource<Message>();

        // Obtain a byte unique to that socket...
        Id = GetId(client);
    }
}

そしてProcessMessageこれになります:

ProcessMessage(Message msg)
{
    if (msg.Id == 0)
        OnReceived(msg); // To raise an event.
    else
    {
        // Method to find a request using msg.Sender and msg.Id
        var request = Requests.Find(msg);

        if (request != null)
            request.Source.SetResult(msg);
    }
}

どんなコレクションタイプRequestsになるのかわかりませんが。

編集:ConcurrentDictionary<Key, Request> where isは、 (ソケットのID)フィールドと(メッセージのID)フィールドKeyを持つプライベート構造体を使用しました。また、を実装します。Int32ByteIEquatable<T>

4

1 に答える 1

1

数年前に、いくつかの一般的な問題(メッセージのフレーミング継続的な読み取り一般的なエラーの説明など)に対処するTCP / IP .NETSocketsFAQを作成しました。コードサンプルはすべてクラスを使用しますが、同じ概念がすべてのTCP/IPソケットに適用されます。Socket

プロトコルの設計と要求/応答のマッチングに関しては、全体的なアプローチは適切に聞こえます。スレッドセーフであることを確認する必要があります(たとえば、RequestsおそらくConcurrentDictionary)。また、をawait SendAsync呼び出すのではなく、する必要がありますWait

私が試したが本番環境に移行していない別のアプローチは、 TPLデータフローに基づいています。各クライアントの「出力」を表す1つのブロックと、「入力」の別のブロックを作成できます。次に、その上にメッセージフレーミングを重ね、それに一致する要求/応答を重ねて、残りの(未承諾の)メッセージを単一の共有に送信できますBufferBlock

したがって、「エンドユーザー」APIは次のようになります。

// Send a request and asynchronously receive a matching response.
Task<Message> RequestAsync(int clientId, Message message);

// Endpoint for unsolicited messages.
IReceivableSourceBlock<Tuple<int, Message>> UnsolicitedMessages { get; }

次に、ActionBlocktoを接続しUnsolicitedMessagesて、デリゲートが入ってくるたびにデリゲートを実行できます。

于 2012-12-21T16:49:06.703 に答える