9

TCP メッセージ フレーミングを処理する方法の例をあらゆる場所で探しています。NetworkStreams が StreamReader または StreamWriter オブジェクトに渡され、'\n' で区切られたメッセージに ReadLine または WriteLine メソッドを使用する例を数多く見てきました。私のアプリケーション プロトコルには「\n」で終わるメッセージが含まれているため、NetworkStream が適しているようです。ただし、これらすべてを非同期ソケットと組み合わせて処理する適切な方法に関する具体的な例は見つかりません。以下で ReceiveCallback() が呼び出された場合、NetworkStream および StreamReader クラスを実装してメッセージのフレーミングを処理するにはどうすればよいですか? 私が読んだことによると、1 回の受信で 1 つのメッセージの一部を取得し、次の受信で残りのメッセージ (「\n」を含む) を取得する場合があります。これは、あるメッセージの終わりと次のメッセージの一部を取得できるということですか? 確かに、これを処理する簡単な方法があるはずです。

次のコードがあります。

    private void StartRead(Socket socket)
    {
        try
        {
            StateObject state = new StateObject();
            state.AsyncSocket = socket;

            socket.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }

    private void ReceiveCallback(IAsyncResult ar)
    {
        try
        {
            StateObject state = (StateObject)ar.AsyncState;

            int bytes_read = state.AsyncSocket.EndReceive(ar);

            char[] chars = new char[bytes_read + 1];
            System.Text.Decoder decoder = System.Text.Encoding.UTF8.GetDecoder();
            int charLength = decoder.GetChars(state.Buffer, 0, bytes_read, chars, 0);

            String data = new String(chars);

            ParseMessage(data);

            StartRead(state.AsyncSocket);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }
4

3 に答える 3

3

チャンクの前に長さを付ける方が、区切り文字を使用するよりも優れています。改行を使用してデータを送信するために、エスケープを処理する必要はありません。

この回答は、 AsyncCTPの機能を使用しているため、現在は関係がない可能性があります。これは、.netの次のバージョンでのみ使用されます。しかし、それは物事をはるかに簡潔にします。基本的に、同期の場合に行うコードを正確に記述しますが、非同期呼び出しがある場合は「await」ステートメントを挿入します。

    public static async Task<Byte[]> ReadChunkAsync(this Stream me) {
        var size = BitConverter.ToUInt32(await me.ReadExactAsync(4), 0);
        checked {
            return await me.ReadExactAsync((int)size);
        }
    }

    public static async Task<Byte[]> ReadExactAsync(this Stream me, int count) {
        var buf = new byte[count];
        var t = 0;
        while (t < count) {
            var n = await me.ReadAsync(buf, t, count - t);
            if (n <= 0) {
                if (t > 0) throw new IOException("End of stream (fragmented)");
                throw new IOException("End of stream");
            }
            t += n;
        }
        return buf;
    }

    public static void WriteChunk(this Stream me, byte[] buffer, int offset, int count) {
        me.Write(BitConverter.GetBytes(count), 0, 4);
        me.Write(buffer, offset, count);
    }
于 2011-05-19T15:04:36.363 に答える
1

基本的に、バッファを作成し、データを受信するたびにそのデータをバッファに追加して、1 つ以上の完全なメッセージを既に受信しているかどうかを判断します。

との間ReceiveCallbackStartReadは非同期メッセージを受信しないため (着信データはソケット レベルで自動的にバッファリングされます)、完全なメッセージをチェックしてバッファから削除するのに理想的な場所です。

メッセージ 1 の末尾、メッセージ 2、およびメッセージ 3 の先頭をすべて 1 つのチャンクで受信するなど、すべてのバリエーションが可能です。

チャンクを UTF8 でデコードすることはお勧めしません。1 つの UTF8 文字が 2 バイトで構成されている可能性があり、それらがチャンク間で分割されるとデータが破損する可能性があるためです。その場合、byte[]-buffer ( MemoryStream?) を保持し、0x0A バイトでメッセージを分割できます。

于 2011-05-19T12:46:25.323 に答える
0

OK、これが私がやったことです。ネットワーク ストリームに基づいて NetworkStream と StreamReader を作成するリーダー スレッドを作成しました。次に、StreamReader.ReadLine を使用して、そのように行を読み取ります。これは同期呼び出しですが、独自のスレッドにあります。それははるかにうまくいくようです。これはアプリケーションのプロトコル (改行区切りのメッセージ) であるため、これを実装する必要がありました。私が行ったように、他の人が答えを求めて地獄のように周りを見回すことを私は知っています:

public class Client
{
    Socket              m_Socket;

    EventWaitHandle     m_WaitHandle;
    readonly object     m_Locker;
    Queue<IEvent>       m_Tasks;
    Thread              m_Thread;

    Thread              m_ReadThread;

    public Client()
    {
        m_WaitHandle = new AutoResetEvent(false);
        m_Locker = new object();
        m_Tasks = new Queue<IEvent>();

        m_Thread = new Thread(Run);
        m_Thread.IsBackground = true;
        m_Thread.Start();
    }

    public void EnqueueTask(IEvent task)
    {
        lock (m_Locker)
        {
            m_Tasks.Enqueue(task);
        }

        m_WaitHandle.Set();
    }

    private void Run()
    {
        while (true)
        {
            IEvent task = null;

            lock (m_Locker)
            {
                if (m_Tasks.Count > 0)
                {
                    task = m_Tasks.Dequeue();

                    if (task == null)
                    {
                        return;
                    }
                }
            }

            if (task != null)
            {
                task.DoTask(this);
            }
            else
            {
                m_WaitHandle.WaitOne();
            }
        }
    }

    public void Connect(string hostname, int port)
    {
        try
        {
            m_Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

            IPAddress[] IPs = Dns.GetHostAddresses(hostname);

            m_Socket.BeginConnect(IPs, port, new AsyncCallback(ConnectCallback), m_Socket);
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }

    private void ConnectCallback(IAsyncResult ar)
    {
        try
        {
            Socket socket = (Socket)ar.AsyncState;

            socket.EndConnect(ar);

            OnConnect(true, "Successfully connected to server.");

            m_ReadThread = new Thread(new ThreadStart(this.ReadThread));
            m_ReadThread.Name = "Read Thread";
            m_ReadThread.IsBackground = true;
            m_ReadThread.Start();
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }

    void ReadThread()
    {
        NetworkStream networkStream = new NetworkStream(m_Socket);
        StreamReader reader = new StreamReader(networkStream);

        while (true)
        {
            try
            {
                String message = reader.ReadLine();

                // To keep the code thread-safe, enqueue a task in the CLient class thread to parse the message received.
                EnqueueTask(new ServerMessageEvent(message));
            }
            catch (IOException)
            {
                // The code will reach here if the server disconnects from the client. Make sure to cleanly shutdown...
                Disconnect();
                break;
            }
        }
    }

    ... Code for sending/parsing the message in the Client class thread.
}
于 2011-05-19T22:47:32.593 に答える