7

基本的に、私は非常に高速で、小さく、堅牢になるように設計されたシステム用のコードを書いています。私はいくつかの非同期の例から始めて、基本的に私のプロジェクトの複数の場所で使用されるサーバーTcpListenerTcpClientクライアントのクラスを書きました。

基本的に、私のサーバー クラス (コードは後述します) はすべてイベント ベースであり、クライアント コードも同様です。サーバーまたはクライアントのソケットからパケットを 1 つずつ受信すると、すべて正常に動作します。

ただし、送信側 (たとえば、クラス A がクライアント クラスを使用する) の場合、TCP ストリームを介してクラス B のサーバー クラスに一連のパケットを送信します。当然、サーバー クラスはすべてのパケットを 1 つの大きな一括収集として取得する可能性があります。そのため、データ受信イベントのコールバックが発生すると、バッファを取得して処理します。

そして、ここで面白いことが起こります。私の問題は、大きなバッファからすべてのパケットを分割していません。私の問題は、何らかの理由で理解できないことです.クライアントからサーバーに(またはその逆に)5つのパケットを送信し、反対側が5つすべてを取得するとしましょう.datarecieveイベントがトリガーされ、バグが取得され、5つすべてパケットが入っています。それらは処理されます。しかし、イベントは再びトリガーされます..

言い換えると、イベントが 1 回トリガーされる代わりに、5 つの個別のパケットに対して 5 回トリガーされ、これらの 5 つのパケットを含むバッファーを 5 回処理することになります。

そして、私は分散ネットワークを設計しているので、モジュールが通信するノード (モジュール (クライアント クラス) <--> ノード (サーバー クラス) <--> クライアント (クライアント クラス)) は 5 ではなく 25 パケットを取得することを意味します。そして、それらを宛先に転送し、宛先は 25*5 または 125 パケットを取得します。

ここで明らかな何かが欠けていると確信しています。イベントを一度だけ発生させる方法を考えてみました.そして、タオルを投げて、サーバーとクライアントのクラスを書き直して、それらが同期し、クライアントインスタンスごとに(またはサーバー側で)スレッドを持つようにするかもしれません. 、受け入れるスレッド、およびクライアント接続ごとのスレッド) - そうすれば、データフローをより適切に処理できます。つまり、パケット全体がプロセスに入ってきます。そうでない場合は、通常の開始/終了特殊バイトなどを使用して、完全になるのを待ちます。

サーバークラス - そのほとんどがそこにあります。KillClient などの無関係なものをいくつか削除しました。

   public class Server
{
    private TcpListener serverListener;
    private List<ServerClient> clients;

    #region Callbacks


    public delegate void incomingDataCallback(byte[] buffer, string clientID, TcpClient tcpClient);
    public incomingDataCallback incomingData = null;

    public delegate void incomingConnectionCallback(string clientID, TcpClient tcpClient);
    public incomingConnectionCallback incomingConnection = null;

    public delegate void connectionClosedCallback(string clientID, TcpClient tcpClient);
    public connectionClosedCallback connectionClosed = null;

    public delegate void dataWrittenCallback(string clientID, TcpClient tcpClient);
    public dataWrittenCallback dataWritten = null;


    #endregion

    // Constructor
    public Server(string listenIP, int listenPort)
    {
        // Create a new instance of serverlistener.
        serverListener = new TcpListener(IPAddress.Parse(listenIP), listenPort);
        this.clients = new List<ServerClient>();
        this.Encoding = Encoding.Default;
    }

    ~Server()
    {
        // Shut down the server.
        this.Stop();
    }

    public Encoding Encoding { get; set; }

    public IEnumerable<TcpClient> TcpClients
    {
        get
        {
            foreach (ServerClient client in this.clients)
            {
                yield return client.TcpClient;
            }
        }
    }

    public IEnumerable<TcpClient> TcpClients
    {
        get
        {
            foreach (ServerClient client in this.clients)
            {
                yield return client.TcpClient;
            }
        }
    }

    public void Stop()
    {
        this.serverListener.Stop();
        lock (this.clients)
        {
            foreach (ServerClient client in this.clients)
            {
                client.TcpClient.Client.Disconnect(false);
                if (connectionClosed != null)
                    connectionClosed(client.ID, client.TcpClient);
            }
            this.clients.Clear();
        }
    }

    public void WriteToClient(TcpClient tcpClient, byte[] bytes)
    {
        NetworkStream networkStream = tcpClient.GetStream();

        try
        {
            networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, tcpClient);
        }
        catch (System.IO.IOException ex)
        {
            // Port was closed before data could be written. 
            // So remove this guy from clients.
            lock (this.clients)
            {
                foreach (ServerClient cl in clients)
                {
                    if (cl.TcpClient.Equals(tcpClient))
                    {
                        this.clients.Remove(cl);
                        if (connectionClosed != null)
                            connectionClosed(cl.ID, cl.TcpClient);
                        break;
                    }
                }
            }

        }
    }

    private void WriteCallback(IAsyncResult result)
    {
        TcpClient tcpClient = result.AsyncState as TcpClient;
        NetworkStream networkStream = tcpClient.GetStream();
        networkStream.EndWrite(result);

        // Get the ID and return it
        //ServerClient client = result.AsyncState as ServerClient;

        //string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
        string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();


        Console.WriteLine("Write callback called for: " + port);

        //                if (dataWritten != null)
        //                  dataWritten(client.ID, tcpClient);
    }

    private void AcceptTcpClientCallback(IAsyncResult result)
    {
        TcpClient tcpClient;

        try
        {
            tcpClient = serverListener.EndAcceptTcpClient(result);
        }
        catch
        {
            // Often get this error when shutting down the server
            return;
        }

        NetworkStream networkStream = tcpClient.GetStream();
        byte[] buffer = new byte[tcpClient.ReceiveBufferSize];

        // Get the IP Address.. this will be used for id purposes. 
        string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
        string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();

        // Create a client object for this client.
        ServerClient client = new ServerClient(tcpClient, buffer, ipaddr + ":" + port);

        Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
        Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());

        // Lock the list and add it in.
        lock (this.clients)
        {
            this.clients.Add(client);
        }

        if (networkStream.DataAvailable)
        {

            int read = networkStream.Read(client.Buffer, 0, client.Buffer.Length);
            Console.WriteLine("Calling ReadHandle directly with " + read.ToString() + " number of bytes. for clientid: " + client.ID);
            ReadHandle(client, read, networkStream);

        }
        else
        {

            Console.WriteLine("Started beginRead for client in accept connection: " + client.ID);
            networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
            //networkStream.

            Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
            Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());
        }

        Console.WriteLine("Starting BeginAcceptTcpClient again - client: " + client.ID);
        serverListener.BeginAcceptTcpClient(AcceptTcpClientCallback, null);

        // Notify owner that new connection came in
        if (incomingConnection != null)
            incomingConnection(client.ID, tcpClient);
    }

    private void ReadCallback(IAsyncResult result)
    {
        ServerClient client = result.AsyncState as ServerClient;


        if (client == null)
        {
            Console.WriteLine("ReadCallback: Null client");
            return;
        }

        int read = 0;

        NetworkStream networkStream = client.NetworkStream;
        try
        {
            read = networkStream.EndRead(result);
        }
        catch (System.IO.IOException ex)
        {
            Console.WriteLine("ReadCallback: Exception occured during reading.. Message: " + ex.Message + " client " + client.ID);
            lock (this.clients)
            {
                this.clients.Remove(client);
                if (connectionClosed != null)
                    connectionClosed(client.ID, client.TcpClient);
                return;
            }

        }

        ReadHandle(client, read, networkStream);
    }

    private void ReadHandle(ServerClient client, int read, NetworkStream networkStream)
    {



        // If zero bytes read, then client disconnected.
        if (read == 0)
        {
            Console.WriteLine("ReadHandle: Read == 0, closing connection for Client: " + client.ID);
            lock (this.clients)
            {
                this.clients.Remove(client);
                if (connectionClosed != null)
                    connectionClosed(client.ID, client.TcpClient);
                return;
            }
        }

        //string data = this.Encoding.GetString(client.Buffer, 0, read);

        // Do something with the data object here.
        if (incomingData != null)
            incomingData(client.Buffer, client.ID, client.TcpClient);

        // Go back to accepting data from client.
        try
        {
          networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
            Console.WriteLine("ReadHandle: BeginRead called for client " + client.ID);
        }
        catch (Exception ex)
        {
            // Damn, we just lost the client.
            Console.WriteLine("ReadHandle: Exception occured during trying to BeginRead.. Message: " + ex.Message + " client " + client.ID);
            lock (this.clients)
            {
                this.clients.Remove(client);
                if (connectionClosed != null)
                    connectionClosed(client.ID, client.TcpClient);
                return;
            }
        }

    }
}

internal class ServerClient
{
    public ServerClient(TcpClient tcpClient, byte[] buffer, string ipaddr)
    {
        if (tcpClient == null) throw new ArgumentNullException("tcpClient");
        if (buffer == null) throw new ArgumentNullException("tcpClient");
        if (ipaddr == null) throw new ArgumentNullException("tcpClient");

        this.TcpClient = tcpClient;
        this.Buffer = buffer;
        this.ID = ipaddr;
    }

    public TcpClient TcpClient { get; private set; }
    public byte[] Buffer { get; private set; }
    public string ID { get; private set; }
    public NetworkStream NetworkStream
    {
        get
        {
            return TcpClient.GetStream();
        }
    }
}
}

そして、これがクライアント クラスです。これは、サーバーに比べて小さくて単純です。

public class Client
{
    private IPAddress address;
    private int port;
    private string ID;

    //private WaitHandle addressSet;
    private TcpClient tcpClient;
    private int failedConnectionCount;

    public bool keepOnTrying = false;

    #region Callbacks

    public delegate void incomingDataCallback(byte[] buffer, string serverID);
    public incomingDataCallback incomingData = null;


    public delegate void connectedCallback(string serverID);
    public connectedCallback clientConnected = null;

    public delegate void connectionFailedCallback(string serverID);
    public connectionFailedCallback clientConnectionFailed = null;

    public delegate void connectionClosedCallback(string serverID);
    public connectionClosedCallback connectionClosed = null;

    public delegate void dataWrittenCallback(string serverID);
    public dataWrittenCallback dataWritten = null;

    #endregion

    public Client(IPAddress address, int port)
    {
        this.address = address;

        if (port < 0) throw new ArgumentException();

        this.port = port;
        this.tcpClient = new TcpClient();
        this.Encoding = Encoding.Default;
        this.ID = address.ToString() + ":" + port.ToString();

        tcpClient.ReceiveBufferSize = 16384;
        tcpClient.SendBufferSize = 16384;
    }

    // Destructor
    ~Client()
    {
        this.Disconnect();
    }

    public Encoding Encoding { get; set; }


    public void Connect()
    {
        tcpClient.BeginConnect(address, port, ConnectCallback, null);
    }

    public void Disconnect()
    {
        tcpClient.Close();
        if (connectionClosed != null)
            connectionClosed(ID);
    }

    public void Write(byte[] bytes)
    {
        NetworkStream networkStream = tcpClient.GetStream();

        networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, null);
    }

    private void WriteCallback(IAsyncResult result)
    {
        NetworkStream networkStream = tcpClient.GetStream();

        if (tcpClient.Connected)
        {
            networkStream.EndWrite(result);
        }

        if (dataWritten != null)
            dataWritten(ID);
    }

    private void ConnectCallback(IAsyncResult result)
    {
        // Check to see if connected successfully or not. If we didnt, then the try/catch block will increment
        // the failed connection count.
        try
        {
            tcpClient.EndConnect(result);
        }
        catch
        {
            Interlocked.Increment(ref failedConnectionCount);
            if (keepOnTrying)
                tcpClient.BeginConnect(address, port, ConnectCallback, null);

            if (clientConnectionFailed != null)
                clientConnectionFailed(ID);

            return;
        }

        // Connected successfully.
        // Now begin async read operation.

        NetworkStream networkStream = tcpClient.GetStream();
        byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
        networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);

        if (clientConnected != null)
            clientConnected(ID);
    }

    private void ReadCallback(IAsyncResult result)
    {
        int read;
        NetworkStream networkStream;

        try
        {
            networkStream = tcpClient.GetStream();
            read = networkStream.EndRead(result);

        }
        catch
        {
            // An error has occured when reading.. -.-
            Console.WriteLine("Error occured while reading for ID: " + ID);
            return;
        }



        // If read is 0, then connection was closed

        if (read == 0)
        {
            if (connectionClosed != null)
                connectionClosed(ID);
            return;
        }

        if (result.IsCompleted == false)
        {
            Console.WriteLine("Uh oh ");
        }

        byte[] buffer = result.AsyncState as byte[];

        if (incomingData != null)
            incomingData(buffer, ID);

        // Then begin reading again.
        networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);
    }

}

そして、これらのクラスを使用する方法は次のとおりです。

  1. クラスを作成し、サーバーまたはクライアントのいずれかのオブジェクトを作成します。
  2. すべてのコールバックを結び付けます。つまり、コールバックごとにクラスに関数を作成します。
  3. サーバーの起動またはクライアントの接続を呼び出します。どちらを使用しているかによって異なります。

したがって、私の問題を再現するには、次のようにします。

  1. あるプログラムでサーバー クラスを作成し、別のプログラムでクライアントを作成します。クライアントをサーバーに接続します。
  2. 入ってくるデータのコールバックを行います。シリアライゼーションを使用しているので、同様のことができます。
  3. クライアントに大量のデータを一度にサーバーに送信させます。私の場合、モジュールで JSON データを独自の形式に変換してから、それをサーバーに送信します。したがって、サーバーは一度に大量のパケットを取得します。
  4. サーバーが受信バッファーにすべてのパケットを取得し、incomingDataCallback を呼び出すたびに、すべてのパケットを含むバッファーがあることを確認する必要があります。そして、受信したすべてのパケットに対してそれを呼び出します。バイトではなく、パケット全体。

したがって、コードを書き直して同期させ、スレッドで実行する前に、次のようにします。

  1. データが入ってきたときにそのようにするために、別の/より良いことができることはありますか-イベントを1回呼び出して、バッファ内のすべてのパケットを処理できます-または-
  2. 呼び出される他のイベントが最初のイベントと同じバッファを共有しないようにする方法はありますか? プロセッサー時間の浪費であることはわかっていますが、incomingDataCallback ハンドラーに「最初の 10 バイトが 00 の場合は戻ります」という行を含めることができます。そのため、最初のイベントでバッファをすべて無効にし、後続のイベントでそれらを検出することを考えていました。

更新: Servy のコメントにより、これらのクラスの使用方法は次のとおりです。すべてを c/p するのではなく、関連する部分だけを c/p します。

Node - Server クラスを使用します。

class ModuleClient
{
    private List<ModuleClientInfo> clients = new List<ModuleClientInfo>();
    private Server myServer = null;

    public ModuleClient()
    {
        // create a server object
        myServer = new Server("127.0.0.1", 9000);

        // Attach callbacks
        myServer.connectionClosed = connClosed;
        myServer.dataWritten = dataWritten;
        myServer.incomingConnection = incomingConn;
        myServer.incomingData = incomingData;
    }

    public void startListeningForModules()
    {
        if (!listeningForModules)
            myServer.Start();
        else
            return;

        listeningForModules = true;
    }

    private void incomingData(byte[] buffer, string clientID, TcpClient tcpClient)
    {
        Console.WriteLine("Incoming Data from " + clientID);

        incomingPacketStruct newPacket = new incomingPacketStruct();
        newPacket.clientID = clientID;
        newPacket.buffer = buffer;
        newPacket.tcpClient = tcpClient;
    }

バッファに 5 つのパケットがあり、incomingData が 5 回呼び出されていることに気付いたのは、incomingData です。

ここで、クライアントのincomingDataについて(発信データでこの動作に気付いていないことに注意してください。また、関連性もありません。一度に10個のjsonパケットを取得したとしましょう。それらをノードに送信します。つまり、10回の書き込みです。ノードはそれらをすべて同じバッファに取得し、サーバーの着信データを 10 回呼び出し、そのたびに 10 個のパケットを確認します。

クライアントの受信データ:

public partial class Program : ServiceBase
{
   // Globals
    private static SocketObject.Client myHermesClient = null;
    private static JSONInterface myJsonInterface = null;

    private static void mainThread(object data)
    {

        // Take care of client and callbacks..
        myHermesClient = new SocketObject.Client(System.Net.IPAddress.Parse("127.0.0.1"), 9000);
        myHermesClient.connectionClosed = hermesConnectionClosed;
        myHermesClient.clientConnected = hermesConnected;
        myHermesClient.dataWritten = hermesDataWritten;
        myHermesClient.incomingData = hermesIncomingData;
        myHermesClient.clientConnectionFailed = hermesConnectionFailed;

        myHermesClient.keepOnTrying = true;

        // Begin async connect
        myHermesClient.Connect();


        // Main loop for service.
        while (serviceRunning)
        {
            Thread.Sleep(500);
        }

    }

    #region Hermes Client Code
    private static void hermesIncomingData(byte[] buffer, string serverID)
    {

    }

そしてまた、同じこと。サーバーが大量のデータをクライアントに送り返すとき..中断してバッファを見ると、私が話していることがわかります。

さて、これを明確にしたい..私の問題は、パケットを分割していません。私はコードを持っています(独自のものであり、これとは関係がないため含まれていません-バッファを変更せず、そこからオブジェクトのリストを作成するだけです)-しかし、問題は上記のようにコールバックが複数回呼び出されることです。

4

2 に答える 2

1

非同期ネットワークのコードは十分にトリッキーです。すべてのコードがなければ、おそらくこれを解決することは不可能です。私はいくつかの考えで答えます:

  1. 使用しているライブラリはしっかりしています。TcpClient、TcpListener、または NetworkStream に問題があるとは思えません。
  2. 同じバッファを何度も再利用しています。メソッドはデータをミラーリングしているようには見えませincomingData()んが、同じ基になるバッファーを再利用しています。しかし、繰り返しますが、何もしないので、この動作を変更する可能性のある関数を明らかに切り取っています。
  3. Write で呼び出したデータとまったく同じチャンクがパケットに含まれているとは限りません。「パケットが完全な場合は処理します。そうでない場合は、さらにデータが入ってくるのを待ちます」ということを行う必要があります。これは通常、BeginRead に指定したバッファーからデータを取り出し、それを別のバッファーにコピーすることを意味します。
  4. さらに多くのデバッグ情報が必要になります。1 人のコメンターが WireShark を提案しましたが、私もその推奨事項を繰り返します。これは、クライアントの問題とサーバー側の問題を明確にするのに役立ちます。同様に、ReadHandle への各呼び出しでワイヤから取り出されたバイト数をログに記録する必要があります。

本当に、もっとデバッグ情報が必要です。これにより、それぞれが 5 ブロックのデータを含む 5 つのパケットを取得しているように見える理由が明確になるはずです。

于 2014-05-08T22:37:39.293 に答える
1

中身private void ReadCallback(IAsyncResult result)

ReadHandle(client, read, networkStream); 

次にReadHandle()、コールバックを再度セットアップします。

于 2013-05-03T12:08:19.900 に答える