3

最近、外部アプリの TCP リスナーに接続し、大量のデータを高頻度で処理するための小さな TCP クライアント アプリを作成する必要がありました。

例外をキャッチし、関心のあるいくつかのプロパティ (ネットワーク ストリームなど) への参照を保持するためだけに、TCPClient クラスの周りにラッパー クラスを作成しました。ラッパーは次のとおりです。

public class MyTCPClient
    {

        private string serverIP;
        private int serverPort;

        public TcpClient tcpClient = new TcpClient();
        private IPEndPoint serverEndPoint;
        private NetworkStream stream = null;

        public string name;

        public MyTCPClient(string serverIp, int serverPort, string parentName)
        {
            this.serverIP = serverIp;
            this.serverPort = serverPort;
            this.name = parentName + "_TCPClient";

            serverEndPoint = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);

            tcpClient.ReceiveBufferSize = 1048576;

            this.TryConnect();
        }

        private bool TryConnect()
        {
            try
            {
                tcpClient.Connect(serverEndPoint);
            }
            catch (SocketException e1)
            {
                throw new ErrorOnConnectingException(e1, "SocketException while connecting. (see msdn Remarks section for more details. ) Error code: " + e1.ErrorCode);
            }
            catch (ArgumentNullException e2)
            {
                throw new ErrorOnConnectingException(e2, "ArgumentNullException while connecting. (The hostname parameter is null.) Message: " + e2.Message);
            }
            catch (ArgumentOutOfRangeException e3)
            {
                throw new ErrorOnConnectingException(e3, "ArgumentOutOfRangeException while connecting (The port parameter is not between MinPort and MaxPort. ). Message: " + e3.Message);
            }
            catch (ObjectDisposedException e4)
            {
                throw new ErrorOnConnectingException(e4, "ObjectDisposedException while connecting. (TcpClient is closed. ) Message: " + e4.Message);
            }


            try
            {
                stream = this.tcpClient.GetStream();
            }
            catch (ObjectDisposedException e1)
            {
                throw new ErrorOnGettingStreamException(e1, "ObjectDisposedException while acquiring Network stream. (The TcpClient has been closed. ) Message: " + e1.Message);
            }
            catch (InvalidOperationException e2)
            {
                throw new ErrorOnGettingStreamException(e2, "ArgumentOutOfRangeException while acquiring Network stream (The TcpClient is not connected to a remote host.  ). Message: " + e2.Message);
            }

            return true;
        }

        public string ReadData()
        {
            try
            {
                ASCIIEncoding encoder = new ASCIIEncoding();

                byte[] dataHeader = new byte[12];
                if (this.tcpClient.Connected)
                {
                    stream.Read(dataHeader, 0, 12);
                }
                else
                {
                    throw new ErrorOnReadingException(null, "The underlying TCP tcpClient is not connected any more");
                }

                var strHeaderMessage = System.Text.Encoding.Default.GetString(dataHeader);

                Utils.logToTimeStampedFile(strHeaderMessage, name);

                int bodyAndTailCount = Convert.ToInt32(strHeaderMessage.Replace("#", ""));
                byte[] dataBodyAndTail = new byte[bodyAndTailCount];

                if (this.tcpClient.Connected)
                {
                    stream.Read(dataBodyAndTail, 0, bodyAndTailCount);
                }
                else
                {
                    throw new ErrorOnReadingException(null, "The underlying TCP tcpClient is not connected any more");
                }

                var strBodyAndTailMessage = System.Text.Encoding.Default.GetString(dataBodyAndTail);

                Utils.logToTimeStampedFile(strBodyAndTailMessage, name);

                return strBodyAndTailMessage;

            }
            catch (FormatException e0)
            {
                CloseAllLeft();
                throw new ErrorOnReadingException(e0, "FormatException while reading data. (Bytes red are null or does not correspond to specification, happens on closing Server) Message: " + e0.Message);
            }
            catch (ArgumentNullException e1)
            {
                CloseAllLeft();
                throw new ErrorOnReadingException(e1, "ArgumentNullException while reading data. (The buffer parameter is null.) Message: " + e1.Message);
            }
            catch (ArgumentOutOfRangeException e2)
            {
                CloseAllLeft();
                throw new ErrorOnReadingException(e2, "ArgumentOutOfRangeException while reading data. (see msdn description) Message: " + e2.Message);
            }
            catch (IOException e3)
            {
                CloseAllLeft();
                throw new ErrorOnReadingException(e3, "IOException while reading data. (The underlying Socket is closed.) Message: " + e3.Message);
            }
            catch (ObjectDisposedException e4)
            {
                CloseAllLeft();
                throw new ErrorOnReadingException(e4, "ArgumentOutOfRangeException while reading data. (see msdn description) Message: " + e4.Message);
            }
        }

        public void CloseAllLeft()
        {
            try
            {
                stream.Close();
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception closing tcp network stream: " + e.Message);
            }
            try
            {
                tcpClient.Close();
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception closing tcpClient: " + e.Message);
            }
        }
    }

それでも、この MyTCPClient を使用するスレッドについては何も言及されていません。アプリには、異なるポートで接続し、異なるジョブを実行する 2 つの TCP クライアントが必要です。私は TCP プログラミングに不慣れで、プロパティをさまよった後、ブロッキング読み取りアプローチを使用することにしました。つまり、デフォルトでは、TCPClient.Read() メソッドは新しいデータが存在するまでスレッドをブロックします。外部アプリのリスナーを制御できないため、このようなアプローチが必要でした。サーバーの終了を認識する唯一の方法は、TCP ソケット仕様に従って送信された「ゼロバイト」でした。

そのため、後で上記の MyTCPClient クラスを使用するスレッドを維持および制御する抽象クラスを作成します (設計上、最終的には親スレッドをブロックする可能性があります)。私の抽象的なTCPManagerのコードは次のとおりです。

/// <summary>
    /// Serves as a dispatcher for the high frequency readings from the TCP pipe.
    /// Each time the thread is started it initializes new TCPClients which will attempt to connect to server.
    /// Once established a TCP socket connection is alive until the thread is not requested to stop.
    ///
    /// Error hanling level here:
    ///
    /// Resources lke NetworkStream and TCPClients are ensured to be closed already within the myTCPClient class, and the error handling here
    /// is steps on top of that - sending proper emails, notifications and logging.
    ///
    /// </summary>
    public abstract class AbstractmyTCPClientManager
    {

        public string name;
        public string serverIP;
        public int serverPort;

        public Boolean requestStop = false;
        public Boolean MyTCPClientThreadRunning = false;
        public Boolean requestStart = false;

        public myTCPClient myTCPClient;

        public int sleepInterval;

        public Thread MyTCPClientThread;

        public AbstractmyTCPClientManager(string name, string serverIP, int serverPort)
        {
            this.name = name;
            this.serverIP = serverIP;
            this.serverPort = serverPort;
        }

        public void ThreadRun()
        {
            MyTCPClientThreadRunning = false;
            bool TCPSocketConnected = false;
            bool AdditionalInitializationOK = false;

            // keep trying to init requested tcp clients
            while (!MyTCPClientThreadRunning && !requestStop) // and we are not suggested to stop
            {
                while (!TCPSocketConnected && !requestStop) // and we are not suggested to stop)
                {
                    try
                    {
                        myTCPClient = new myTCPClient(serverIP, serverPort, name);

                        TCPSocketConnected = true;
                    }
                    catch (ErrorOnConnectingException e0)
                    {

                        // nah, too long message
                        string detail = e0.originalException != null ? e0.originalException.Message : "No inner exception";
                        //Utils.logToTimeStampedFile("Creating connection attempt failed.(1." + e0.customMessage + " 2." + detail + "). Will retry in 10 seconds...", name);
                        //Utils.logToTimeStampedFile(e0.customMessage + " (" + detail + "). Will retry in 10 seconds...", name);
                        Utils.logToTimeStampedFile(detail + ". Will retry in 10 seconds...", name);

                        Thread.Sleep(10000);
                    }
                    catch (ErrorOnGettingStreamException e1)
                    {
                        // nah, too long message
                        string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";
                        //Utils.logToTimeStampedFile("Getting network stream attempt failed. (1." + e1.customMessage + " 2." + detail + "). Will retry in 10 seconds...", name);
                        //Utils.logToTimeStampedFile(e1.customMessage + " (" + detail + "). Will retry in 10 seconds...", name);

                        Utils.logToTimeStampedFile(detail + ". Will retry in 10 seconds...", name);
                        Thread.Sleep(10000);
                    }
                }
                Utils.logToTimeStampedFile("TCP Communication established", name);

                while (!AdditionalInitializationOK && !requestStop) // or we are not suggested to stop
                {
                    try
                    {
                        AdditionalInitialization();

                        AdditionalInitializationOK = true;

                    }
                    catch (AdditionalInitializationException e1)
                    {
                        string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";

                        //Utils.logToTimeStampedFile("Additional initialization failed (1." + e1.customMessage + " 2." + detail + "). Will retry in 10 seconds", name);

                        Utils.logToTimeStampedFile(e1.customMessage + ". Will retry in 10 seconds", name);
                        Thread.Sleep(10000);
                    }
                }

                MyTCPClientThreadRunning = TCPSocketConnected && AdditionalInitializationOK;
                ViewModelLocator.ControlTabStatic.updateUIButtons();
            }
            Utils.logToTimeStampedFile("Additional Initialization successfully completed, thread started", name);

            // while all normal (i.e nobody request a stop) continiously sync with server (read data)
            while (!requestStop)
            {
                try
                {
                    syncWithInterface();
                }
                catch (ErrorOnReadingException e1)
                {
                    string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";

                    //Utils.logToTimeStampedFile("Error ocured while reading data. (1." + e1.customMessage + " 2." + detail + ")", name);
                    Utils.logToTimeStampedFile(e1.customMessage, name);

                    if (!requestStop) // i.e if this indeed is an exception, during a normal flow, and nobody requested a thread stop (which migh cause read exceptions as a consequence)
                    {
                        Utils.logToTimeStampedFile("There was no external stop request, when the error occured, doing tcp client restart.", name);
                        requestStop = true;
                        requestStart = true;
                    }
                }

                Thread.Sleep(sleepInterval);
            }

            // we need to close all after execution, but the execution may be closed before/while resources were still initializing
            if (TCPSocketConnected)
            {
                myTCPClient.CloseAllLeft();
            }
            if (AdditionalInitializationOK)
            {
                ReleaseAdditionalResources();
            }

            // remember that thread is stoped
            MyTCPClientThreadRunning = false;
            Utils.logToTimeStampedFile("Thread stoped", name);
            ViewModelLocator.ControlTabStatic.updateUIButtons();

            // this serves as a restart
            if (requestStart)
            {
                Utils.logToTimeStampedFile("Restarting thread...", name);
                this.requestStop = false;
                this.requestStart = false; // we are already processing a request start event, so reset this flag

                this.MyTCPClientThread = new Thread(new ThreadStart(this.ThreadRun));
                this.MyTCPClientThread.Name = this.name;
                this.MyTCPClientThread.IsBackground = true;
                this.MyTCPClientThread.Start();
            }
        }

        /// <summary>
        /// this method empties the entire TCP buffer, cycling through it
        /// </summary>
        private void syncWithInterface()
        {
            int counter = 0;
            // read at most 100 messages at once (we assume that for 3 sec interval there might not be more,
            //even if they are, it is still OK, they just will be processed next time)
            while (counter < 100)
            {
                counter++;
                string data = myTCPClient.ReadData();
                ForwardData(data);
            }

            // below is left for testing:
            /*
             * "Sleep(0) or Yield is occasionally useful in production code for
             * advanced performance tweaks. It’s also an excellent diagnostic tool
             * for helping to uncover thread safety issues: if inserting Thread.Yield()
             * anywhere in your code makes or breaks the program, you almost certainly have a bug."*/
            Thread.Yield();
        }

        /// <summary>
        /// Left for implementing in the caller that initialized the object. Meaning: one and the same way for receiving market/order data. Different ways of processing this data
        /// </summary>
        /// <param name="data"></param>
        public abstract void ForwardData(string data);

        /// <summary>
        /// left for implementing in child classes. Its purpose is to initialize any additional resources needed for the thread to operate.
        /// If something goes wrong while getting this additional resources,
        /// an AdditionalInitialization exception should be thrown, which is than handled from the initialization phase in the caller.
        /// </summary>
        public abstract void AdditionalInitialization();

        // countrapart of AdditionalInitialization method - what is initialized should be then closed
        public abstract void ReleaseAdditionalResources();
    }

その後、必要な各 TCP 通信チャネルには、上記の抽象クラス専用の実装があり、メソッド ForwardData (つまり、このデータをどうするか) と AdditionalInitialization (つまり、特定の TCP 通信処理の前に初期化する必要があるもの) の実装を提供します。たとえば、私のスレッドでは、データを受信する前に追加のストレージスレッドを初期化する必要がありました)。

TCP 処理を閉じる以外はすべて問題ありませんでした。スレッドが終了するか続行するかを制御するために、この requestStop 変数がありましたが、Read() メソッドが継続的なブロックに陥り、requestStop 変数でさえ読み取られなくなる可能性があります (必要な 2 つの tcp チャネルはプロセスの 1 つは非常に頻繁にデータを受信し、もう 1 つは散発的にデータを受信するという点で大きく異なります)。私はまだ彼らに同じデザインを実装してもらいたいと思っています。したがって、これまで読んだことから、requestStopパラメーターを観察して実際に仕事を引き受ける別の「親」、または「制御」、または「ラッパー」スレッドを実装する必要があります。

この投稿のような解決策、またはこの投稿ようなタイマーを探しています

どんな提案でも大歓迎です。ありがとう!

4

3 に答える 3

3

私は個人的にこれに非同期ソケットを使用します:http: //msdn.microsoft.com/en-us/library/bbx2eya8.aspx

ただし、それでもブロッキング読み取りを使用したい場合は、別のスレッドからソケットをClose()するだけで済みます。

これがお役に立てば幸いです。

于 2013-03-12T10:08:05.983 に答える
2

NetworkStream の ReadAsync メソッドを呼び出し、それに CancellationToken を渡すことをお勧めします。このようにして、リクエスト停止イベントが観察されたときに、読み取り操作を (別のスレッドから) 簡単にキャンセルできます。

public class MyTCPClient : IDisposable
{
  ...
  private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource ();
  ...

  public string ReadData()
  {
     ...
     byte[] dataHeader = new byte[12];
     if (this.tcpClient.Connected)
     {
         stream.ReadAsync(dataHeader, 0, 12, cancellationTokenSource.Token).Wait();
     } ...
于 2013-03-12T10:09:16.813 に答える
0

「requestStop」ブール値を設定し、別のスレッドからクライアント ソケットを閉じます。これにより、read() 呼び出しがエラー/例外とともに「early」を返します。クライアント スレッドは、read() が戻るたびに「requestStop」をチェックし、要求があればクリーンアップ/終了できます。

TBH、とにかく、そのようなクライアントを明示的にシャットダウンすることを気にすることはめったにありません。アプリが終了するまでそのままにしておきます。

于 2013-03-12T10:34:35.093 に答える