Rx を使用して TCPClient 受信ストリームから読み取り、改行「\r\n」で区切られた文字列の IObservable にデータを解析しようとしています。以下は、ソケット ストリームから受信する方法です...
var messages = new Subject<string>();
var functionReceiveSocketData =
Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
(client.Client.BeginReceive, client.Client.EndReceive);
Func<byte[], int, byte[]> copy = (bs, n) =>
{
var rs = new byte[buffer.Length];
bs.CopyTo(rs, 0);
return rs;
};
Observable
.Defer(() =>
{
var buffer = new byte[50];
return
from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
select copy(buffer, n);
}).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));
これが、文字列を解析するために思いついたものです。これは現在機能していません...
obsStrings = messages.Buffer<string,string>(() =>
messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
);
メッセージの件名はメッセージをチャンクで受信するので、メッセージを連結して、連結された文字列に改行が含まれているかどうかをテストし、バッファを閉じてバッファリングされたチャンクを出力するように通知します。なぜ機能しないのかわかりません。obsStrings から最初のチャンクだけを取得しているようです。
だから私は2つのことを探しています。io ストリームの読み取りを簡素化し、メッセージの件名の使用を排除したいと考えています。次に、文字列の解析を機能させたいと考えています。私はこれを少しハッキングしてきましたが、実用的な解決策を思い付くことができません。Rx初心者です。
編集:問題が解決された後の完成品は次のとおりです....
var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
.SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
.Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
.Where(x => x.EndsWith("\r\n"))
.Select(buffered => String.Join("", buffered))
.Select(a => a.Replace("\n", ""));
「ReceiveUntilCompleted」は、RXX プロジェクトの拡張機能です。