7

Rx を使用して TCPClient 受信ストリームから読み取り、改行「\r\n」で区切られた文字列の IObservable にデータを解析しようとしています。以下は、ソケット ストリームから受信する方法です...

var messages = new Subject<string>();

var functionReceiveSocketData =
            Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
            (client.Client.BeginReceive, client.Client.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
        {
            var rs = new byte[buffer.Length];
            bs.CopyTo(rs, 0);
            return rs;
        };

Observable
    .Defer(() =>
            {
                var buffer = new byte[50];
                return
                    from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
                select copy(buffer, n);
            }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));

これが、文字列を解析するために思いついたものです。これは現在機能していません...

obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
            );

メッセージの件名はメッセージをチャンクで受信するので、メッセージを連結して、連結された文字列に改行が含まれているかどうかをテストし、バッファを閉じてバッファリングされたチャンクを出力するように通知します。なぜ機能しないのかわかりません。obsStrings から最初のチャンクだけを取得しているようです。

だから私は2つのことを探しています。io ストリームの読み取りを簡素化し、メッセージの件名の使用を排除したいと考えています。次に、文字列の解析を機能させたいと考えています。私はこれを少しハッキングしてきましたが、実用的な解決策を思い付くことができません。Rx初心者です。

編集:問題が解決された後の完成品は次のとおりです....

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
            .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
            .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
            .Where(x => x.EndsWith("\r\n"))
            .Select(buffered => String.Join("", buffered))
            .Select(a => a.Replace("\n", ""));

「ReceiveUntilCompleted」は、RXX プロジェクトの拡張機能です。

4

2 に答える 2

3
messages
    .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
    .Where(x => x.EndsWith("\r\n"))
于 2012-05-02T21:22:08.713 に答える
1

の代わりにSubscribe、を使用してSubject、あなたはただ試すことができますSelect

.Repeat().Select(x => System.Text.Encoding.UTF8.GetString(x));

これがすべてと呼ばれる新しいオブザーバブルに入ったと仮定するとmessages、次の問題はこの行にあることです

var obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
            );

あなたはとの両方Bufferを使用Scanしていて、両方で同じことをしようとしています!Bufferクロージングセレクターが必要であることに注意してください。

あなたが本当に欲しいのは:

var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("\r\n")))
                         .Select(buffered => String.Join(buffered));

これにより、ウィンドウを閉じるタイミング(\ r \ nが含まれている場合)に関してBufferedに観察可能が与えられ、連結するバッファリングされた量をSelectに与えられます。これにより、分割された文字列の新しいオブザーバブルが生成されます。

1つの問題は、チャンクの途中に新しい行が残っている可能性があり、これにより問題が発生することです。簡単なアイデアの1つは、次のような完全な文字列チャンクではなく、文字を観察することです。

obsStrings.Repeat().SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());

次に、バッファをmessages.Where(c => c != '\r')スキップして次のように変更できます。\r

var obsStrings = messages.Buffer(() => messages.Where(x => x == '\n')))
                         .Select(buffered => String.Join("", buffered));
于 2012-05-01T21:30:35.520 に答える