GetMessages
Rx を使用して関数を記述する最も簡潔な方法は次のとおりです。
static void Main()
{
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var messages = GetMessages(socket, IPAddress.Loopback, 4000);
messages.Subscribe(x => Console.WriteLine(x));
Console.ReadKey();
}
static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// now will receive a stream of messages
// each message is prefixed with an 4 bytes/Int32 indicating it's length.
// the rest of the message is a string
// ????????????? Now What ?????????????
}
上記のサンプルのドライバーとしての単純なサーバー: http://gist.github.com/452893#file_program.cs
ソケットプログラミングに Rx を使用する場合
私が行っているいくつかのソケットプログラミング作業にReactive Extensionsを使用して調査してきました。そうする私の動機は、どういうわけかコードを「簡単」にすることです。これがコードの削減を意味するかどうか、それらの行に沿って何かをネストすることを減らすかどうか。
ただし、これまでのところそうではないようです。
- ソケットで Rx を使用する例はあまり見つかりませんでした
- 私が見つけた例は、既存
のBeginXXXX、EndXXXX コードよりも複雑ではないようです。 - FromAsyncPatternの
Observable
拡張メソッドがありますが、SocketEventArgs
これはAsync APIをカバーしていません。
現在機能していないソリューション
これが私がこれまでに持っているものです。これは機能しません。スタック オーバーフローで失敗します (heh)IObservable
指定されたバイト数を読み取る を作成できるように、セマンティクスを理解していません。
static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// keep reading until we get the first 4 bytes
byte[] buffer = new byte[1024];
var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
IObservable<int> readBytes = null;
var temp = from totalRead in Observable.Defer(() => readBytes)
where totalRead < 4
select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
readBytes = temp.SelectMany(x => x).Sum();
var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
}