1

私は、異なるクライアントからの複数の tcp 接続を受け入れる ac# アプリケーション (.net 4) に取り組んでいます。ソケットを受け入れる単一の tcp リスナーがあります。デュプレックスでの通信 b/w ノード。データは Networkstream.Write メソッドを使用して送信され、Networkstream.read メソッドを使用して読み取られます。TCP 接続ごとに、個別のスレッドが作成されます。

問題は、数日前にクライアントの 1 つが (バグにより) 20 分間データの読み取りを停止したことに気付いたことです。接続は切断されていないため、サーバーで (IO) 例外は発生しませんでした。ただし、他のクライアントのデータも移動していないことに気付きました。20 分後、そのクライアントは再びデータの受信を開始し、すぐに他のクライアントもデータの受信を開始しました。

ネットワーク ストリームの書き込みメソッドがブロッキング メソッドであり、タイムアウトを使用していないことはわかっています。そのため、書き込みがブロックされている可能性があります (ここで説明します)。しかし、私が理解しているように、TCP接続ごとに個別の書き込みバッファが必要であるか、それとももっと何かが働いています。tcp 接続での送信ブロックは、同じアプリケーション内の他の tcp 接続に影響を与えますか?

書き込み操作の擬似コードを次に示します。接続ごとに、別のスレッドによる別の送信キュー プロセスがあります。

public class TCPServerListener : baseConnection
{

    private readonly int _Port;
    private TcpListener _tcpListener;
    private Thread _thread;
    private List<TcpClientData> _tcpClientDataList = new List<TcpClientData>();
    private long _messageDiscardTimeout;
    private bool LoopForClientConnection = true;

    public TCPServerListener(int port, ThreadPriority threadPriority)
    {
        try
        {
            // init property
        }
        catch (Exception ex)
        {
            // log
        }
    }

    public void SendMessageToAll(int type)
    {
        base.EnqueueMessageToSend(type, _tcpClientDataList);
    }
    public void SendMessageToList(int type, IList<TcpClient> tcpClientList)
    {
        base.EnqueueMessageToSend(type, tcpClientList);
    }
    public void SendMessage(int type, TcpClient tcpClient)
    {
        base.EnqueueMessageToSend(type, tcpClient);
    }



    private void AcceptClientConnections()
    {
        while (LoopForClientConnection)
        {
            try
            {
                Socket socket = _tcpListener.AcceptSocket();
                TcpClientData tcpClientData = new TcpClientData();
                tcpClientData.tcpClientThread = new Thread(new ParameterizedThreadStart(StartAsync));
                tcpClientData.tcpClientThread.Priority = _threadPriority;
                tcpClientData.tcpClientThread.IsBackground = true;
                tcpClientData.tcpClientThread.Name = "CD" + tcpClientData.tcpClientThread.ManagedThreadId;
                tcpClientData.tcpClient = new TcpClient();
                tcpClientData.tcpClient.Client = socket;
                _tcpClientDataList.Add(tcpClientData);
                tcpClientData.tcpClientThread.Start(tcpClientData.tcpClient);
            }
            catch (ThreadAbortException ex)
            {
                //log

            }
            catch (Exception ex)
            {
                //log
            }
        }
    }




    public override void Start()
    {
        base.Start();
        _tcpListener = new TcpListener(System.Net.IPAddress.Any, _Port);

        _thread = new Thread(AcceptClientConnections);
        _thread.Priority = _threadPriority;
        _thread.IsBackground = true;

        _tcpListener.Start();
        _thread.Start();
    }

    public override void Stop()
    {
       // stop listener and terminate threads
    }
}


public class baseConnection
{
    private Thread _InCommingThread;
    private Thread _OutGoingThread;
    protected ThreadPriority _threadPriority;
    protected BlockingCollection<MessageReceived> _InComingMessageQueue = new BlockingCollection<MessageReceived>();
    protected BlockingCollection<MessageToSend> _OutgoingMessageQueue = new BlockingCollection<MessageToSend>();

    public void StartAsync(Object oTcpClient)
    {
        TcpClient tcpClient = oTcpClient as TcpClient;
        if (tcpClient == null)
            return;

        using (tcpClient)
        {
            using (NetworkStream stream = tcpClient.GetStream())
            {
                stream.ReadTimeout = Timeout.Infinite;
                stream.WriteTimeout = Timeout.Infinite;

                BinaryReader bodyReader = new BinaryReader(stream);

                while (tcpClient.Connected)
                {
                    try
                    {
                        int messageType = bodyReader.ReadInt32();

                        // checks to verify messages 

                        // enqueue message in incoming queue
                        _InComingMessageQueue.Add(new MessageReceived(messageType, tcpClient));
                    }
                    catch (EndOfStreamException ex)
                    {
                        // log
                        break;
                    }
                    catch (Exception ex)
                    {
                        // log
                        Thread.Sleep(100);
                    }
                }
                //RaiseDisconnected(tcpClient);
            }
        }
    }


    public virtual void Start()
    {
        _InCommingThread = new Thread(HandleInCommingMessnge);
        _InCommingThread.Priority = _threadPriority;
        _InCommingThread.IsBackground = true;
        _InCommingThread.Start();

        _OutGoingThread = new Thread(HandleOutgoingQueue);
        _OutGoingThread.Priority = _threadPriority;
        _OutGoingThread.IsBackground = true;
        _OutGoingThread.Start();
    }


    public virtual void Stop()
    {
       // stop the threads and free up resources
    }

    protected void EnqueueMessageToSend(int type, List<TcpClientData> tcpClientDataList)
    {
        tcpClientDataList.ForEach(x => _OutgoingMessageQueue.Add(new MessageToSend(type, x.tcpClient)));
    }
    protected void EnqueueMessageToSend(int type, IList<TcpClient> tcpClientList)
    {
        foreach (TcpClient tcpClient in tcpClientList)
        {
            _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
        }
    }
    protected void EnqueueMessageToSend(int type, TcpClient tcpClient)
    {
        _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
    }


    private void HandleOutgoingQueue()
    {
        while (true)
        {
            try
            {

                MessageToSend message = _OutgoingMessageQueue.Take();

                if (message.tcpClient.Connected)
                {
                    BinaryWriter writer = new BinaryWriter(message.tcpClient.GetStream());
                    writer.Write(message.type);
                }
            }
            catch (ThreadAbortException ex)
            {
                // log
                return;
            }
            catch (Exception ex)
            {
                //_logger.Error(ex.Message, ex);
            }
        }
    }

    private void HandleInCommingMessnge()
    {
        while (true)
        {
            try
            {
                MessageReceived messageReceived = _InComingMessageQueue.Take();

                // handle message
            }
            catch (ThreadAbortException ex)
            {
                // log
                return;
            }
            catch (Exception ex)
            {
                // log
                //_logger.Error(ex.Message, ex);
            }
        }
    }

    public class MessageReceived
    {
        public MessageReceived(int type, TcpClient tcpClient)
        {
            this.tcpClient = tcpClient;
            this.type = type;
        }

        public int type;
        public TcpClient tcpClient;
    }

    public class MessageToSend
    {
        public MessageToSend(int type, TcpClient tcpClient)
        {
            this.tcpClient = tcpClient;
            this.type = type;
        }

        public int type;
        public TcpClient tcpClient;
    }

    public class TcpClientData
    {
        public Thread tcpClientThread;
        public TcpClient tcpClient;
    }
}
4

1 に答える 1

1

接続ごとに個別のスレッドが作成されると述べていますが、表示されたコードは、任意の接続のメッセージをデキューできるようです。

このコードが複数のスレッドで実行されている場合、すべてのスレッドが現在ブロックしている接続にメッセージを送信しようとすると、プログラムはブロックされます。このループが複数のスレッドで実行される場合に直面する可能性のあるもう 1 つの問題は、メッセージが同じ接続に対して正しい順序で到着しない可能性があることです。

于 2013-01-10T10:22:39.073 に答える