0

複数の TCP 接続からデータを受信するサーバー アプリケーションを作成しています。最大 200 接続まで拡張できるようにしたいと考えています。このために私が書いた最初のアルゴリズムは次のとおりです。

while (keepListening)
{
    foreach (TcpClient client in clientList)
    {
        if (!client.Connected)
        {
            client.Close();
            deleteList.Add(client);
            continue;
        }

        int dataAvail = client.Available;
        if (dataAvail > 0)
        {
            NetworkStream netstr = client.GetStream();
            byte[] arry = new byte[dataAvail];
            netstr.Read(arry, 0, dataAvail);
            MemoryStream ms = new MemoryStream(arry);
            try
            {
                CommData data = dataDeserializer.Deserialize(ms) as CommData;
                beaconTable.BeaconReceived(data);
            }
            catch
            { }
        }
    }

    foreach (TcpClient clientToDelete in deleteList)
        clientList.Remove(clientToDelete);
    deleteList.Clear();

    while (connectionListener.Pending())
        clientList.Add(connectionListener.AcceptTcpClient());

    Thread.Sleep(20);
}

これは問題なく動作しますが、Thread.Sleep を追加してループを遅くする必要があることがわかりました。そうしないと、接続の数に関係なく、コア全体が占有されます。Thread.Sleep は一般的に悪いと考えられているとアドバイスされたので、いくつかの代替手段を探しました。これと同様の質問で、WaitHandles を使用して BeginRead と BeginAccept を使用するように勧められたので、それを使用して同じことを行うアルゴリズムを作成し、これを思いつきました。

while (keepListening)
{
    int waitResult = WaitHandle.WaitAny(waitList.Select(t => t.AsyncHandle.AsyncWaitHandle).ToArray(), connectionTimeout);

    if (waitResult == WaitHandle.WaitTimeout)
        continue;

    WaitObject waitObject = waitList[waitResult];
    Type waitType = waitObject.WaitingObject.GetType();

    if (waitType == typeof(TcpListener))
    {
        TcpClient newClient = (waitObject.WaitingObject as TcpListener).EndAcceptTcpClient(waitObject.AsyncHandle);
        waitList.Remove(waitObject);

        byte[] newBuffer = new byte[bufferSize];
        waitList.Add(new WaitObject(newClient.GetStream().BeginRead(newBuffer, 0, bufferSize, null, null), newClient, newBuffer));

        if (waitList.Count < 64)
            waitList.Add(new WaitObject(connectionListener.BeginAcceptTcpClient(null, null), connectionListener, null));
        else
        {
            connectionListener.Stop();
            listening = false;
        }
    }
    else if (waitType == typeof(TcpClient))
    {
        TcpClient currentClient = waitObject.WaitingObject as TcpClient;
        int bytesRead = currentClient.GetStream().EndRead(waitObject.AsyncHandle);

        if (bytesRead > 0)
        {
            MemoryStream ms = new MemoryStream(waitObject.DataBuffer, 0, bytesRead);
            try
            {
                CommData data = dataDeserializer.Deserialize(ms) as CommData;
                beaconTable.BeaconReceived(data);
            }
            catch
            { }
            byte[] newBuffer = new byte[bufferSize];
            waitList.Add(new WaitObject(currentClient.GetStream().BeginRead(newBuffer, 0, bufferSize, null, null), currentClient, newBuffer));
        }
        else
        {
            currentClient.Close();
        }

        waitList.Remove(waitObject);

        if (!listening && waitList.Count < 64)
        {
            listening = true;
            connectionListener.Start();
            waitList.Add(new WaitObject(connectionListener.BeginAcceptTcpClient(null, null), connectionListener, null));
        }
    }
    else
        throw new ApplicationException("An unknown type ended up in the wait list somehow: " + waitType.ToString());
}

これも、64 クライアントに到達するまで問題なく動作します。WaitAny が受け入れる WaitHandles の最大数であるため、64 を超えるクライアントを受け入れないように制限を書きました。この制限を回避する良い方法が見当たらないので、基本的にこのように 64 を超える接続を維持することはできません。Thread.Sleep アルゴリズムは、100 以上の接続で問題なく動作します。

また、データを受信した後に受信データの正確なサイズで割り当てるのではなく、任意のサイズの受信配列を事前に割り当てなければならないこともあまり好きではありません。とにかく WaitAny にタイムアウトを与える必要があります。そうしないと、接続がない場合にアプリケーションを閉じたときに、これを実行しているスレッドが参加できなくなります。そして、それは一般的により長く、より複雑です。

では、なぜ Thread.Sleep が悪い解決策なのでしょうか? 少なくとも WaitAny バージョンで 64 を超える接続を処理できるようにする方法はありますか? 私が見ていない、これを処理するまったく異なる方法はありますか?

4

1 に答える 1

0

Jim は、WaitHandles の代わりに Async コールバックを使用するという明白な提案をしました。最初はこれは複雑すぎると思っていましたが、状態オブジェクトで呼び出し元の TcpListener または TcpClient への参照を渡すことができることに気付いてからは、ずっと簡単になりました。これと、スレッドセーフのためのいくつかの変更により、準備完了です。100 以上の接続で正常にテストされ、正常に終了することに問題はありません。ただし、データ バッファーを事前に割り当てる必要はありません。同様のことをしようとしている人のためのコードは次のとおりです。

public class NetworkReceiver : IDisposable
{
    private IReceiver beaconTable;
    private XmlSerializer dataDeserializer;
    private HashSet<TcpClient> ClientTable;
    private TcpListener connectionListener;
    private int bufferSize = 1000;

    public NetworkReceiver(IReceiver inputTable)
    {
        beaconTable = inputTable;
        dataDeserializer = new XmlSerializer(typeof(CommData));

        ClientTable = new HashSet<TcpClient>();
        connectionListener = new TcpListener(IPAddress.Any, SharedData.connectionPort);
        connectionListener.Start();
        connectionListener.BeginAcceptTcpClient(ListenerCallback, connectionListener);
    }

    private void ListenerCallback(IAsyncResult callbackResult)
    {
        TcpListener listener = callbackResult.AsyncState as TcpListener;
        TcpClient client;

        try
        {
            client = listener.EndAcceptTcpClient(callbackResult);

            lock (ClientTable)
                ClientTable.Add(client);

            ClientObject clientObj = new ClientObject() { AsyncClient = client, Buffer = new byte[bufferSize] };
            client.GetStream().BeginRead(clientObj.Buffer, 0, bufferSize, ClientReadCallback, clientObj);

            listener.BeginAcceptTcpClient(ListenerCallback, listener);
        }
        catch (ObjectDisposedException)
        {
            return;
        }
    }

    private void ClientReadCallback(IAsyncResult callbackResult)
    {
        ClientObject clientObj = callbackResult.AsyncState as ClientObject;
        TcpClient client = clientObj.AsyncClient;

        if (!client.Connected)
            return;

        try
        {
            int bytesRead = client.GetStream().EndRead(callbackResult);
            if (bytesRead > 0)
            {
                MemoryStream ms = new MemoryStream(clientObj.Buffer, 0, bytesRead);
                try
                {
                    CommData data;
                    lock (dataDeserializer)
                        data = dataDeserializer.Deserialize(ms) as CommData;
                    lock (beaconTable)
                        beaconTable.BeaconReceived(data);
                }
                catch
                { }

                client.GetStream().BeginRead(clientObj.Buffer, 0, bufferSize, ClientReadCallback, clientObj);
            }
            else
            {
                client.Close();
                lock (ClientTable)
                    ClientTable.Remove(client);
            }
        }
        catch (Exception ex)
        {
            if (ex.GetType() == typeof(ObjectDisposedException) || ex.GetType() == typeof(InvalidOperationException))
                return;
            else
                throw;
        }
    }

    class ClientObject
    {
        public TcpClient AsyncClient;
        public byte[] Buffer;
    }

    public void Dispose()
    {
        connectionListener.Stop();
        foreach (TcpClient client in ClientTable)
            client.Close();
    }
}
于 2013-03-29T22:00:13.830 に答える