1

私はRxに少し慣れていないので、それを通して自分の道を学んでいます。私はそこにたくさんの例をチェックしましたが、どれも私のニーズに合いません。

シナリオ:1つのソケットサーバーソケットがあります(TCPListenerオブジェクトではなく単純なソケットオブジェクトを使用して作成されています)。このサーバーソケットに複数のクライアント(TCPClientではなくクライアントソケット)が接続されています。BeginReceiveおよびEndReceive非同期操作に「FromAsyncPattern」演算子を使用して、Rx(リアクティブ拡張)を使用してクライアントソケットから送信されたメッセージを取得しようとしています。

バッファ内のメッセージの取得をほぼ開始しました。ただし、問題は、バッファが前の操作から受信した同じメッセージを返すか、現在と前の受信操作からのコンテンツが混在することです。

使用されるコードは次のとおりです。

    private void ReceiveMessageFromClient(Socket socket)
    {
        try
        {
            if (socket != null)
            {
                byte[] buffer = new byte[400];

                var functionReceiveSocketData = Observable.FromAsyncPattern
                                                            <byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

                Observable.Defer(() =>
                                functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
                                ).Repeat().Subscribe(onNext: x => OnMessageReceived(buffer));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

OnMessageReceivedメソッドは、メッセージ受信イベントを発生させ、さらに処理するためにバッファーを他のクラスに送信するための単なるデリゲートです。

問題: メッセージを取得するたびに同じバッファが使用されます。以前に受信したバッファからメッセージをクリーンに受信するにはどうすればよいですか。

-メッセージを部分的に受信し、次のメッセージが実際にはこの同じメッセージの一部である場合はどうすればよいですか。

素晴らしいと思われるいくつかのコードスニペットで上記の問題を解決するのを手伝ってください。この質問を投稿する理由は、これまでに行った実装でTCPListenerを使用しているためであり、ここでの制約はソケットオブジェクトを使用することです:(

前もって感謝します。

4

1 に答える 1

2

発生している問題は、オブザーバブルへの繰り返しの呼び出し間でバッファーの同じインスタンスを共有していることです。毎回新しいバッファインスタンスがあることを確認する必要があります。また、戻り整数に基づいてバッファを切り捨てる必要があるようです。これが私があなたのif声明に入れる必要があることを提案するものです:

var functionReceiveSocketData =
    Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
        (socket.BeginReceive, socket.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
{
    var rs = new byte[n];
    bs.CopyTo(rs, 0);
    return rs;
};

Observable
    .Defer(() =>
    {
        var buffer = new byte[400];
        return 
            from n in functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
            select copy(buffer, n);
    })
    .Repeat()
    .Subscribe(x => OnMessageReceived(x));

編集:コメントに応じて、メッセージ部分を再結合します。また、上記の解決策を修正しました-ラムダの関数にあるべきで0あり、ないはずです。nCopyTocopy

私はソケットの使用の専門家ではありませんが、ソケットは単なるバイトのストリームであるため(間違っている場合は修正してください)、ストリーム内の各メッセージがいつ完了したかを判断できる必要があります。

これを行うための1つの可能な方法は次のとおりです。

/* include `functionReceiveSocketData` & `copy` from above */ =

Func<byte[], byte[], byte[]> append = (bs1, bs2) =>
{
    var rs = new byte[bs1.Length + bs2.Length];
    bs1.CopyTo(rs, 0);
    bs2.CopyTo(rs, bs1.Length);
    return rs;
};

var messages = new Subject<byte[]>();
messages.Subscribe(x => OnMessageReceived(x));

Observable
    .Defer(() => /* as above */)
    .Repeat()
    .Scan(new byte[] { }, (abs, bs) =>
    {
        var current = append(abs, bs);
        if (isCompleteMessage(current))
        {
            messages.OnNext(current);
            current = new byte[] { };
        }
        return current;
    })
    .Subscribe();

あなたがする必要がある唯一のことは、isCompleteMessage関数を実装する方法を理解することです。

于 2011-07-28T11:35:21.433 に答える