ReadUntilClosedObservable1を使用してNetworkStreamからデータを読み取ると、読み取られたデータの一部のブロックがオーバーラップするように、返されたデータが破損します。
ただし、ReadUntilClosedObservable2を使用してデータを読み取ると、データは問題なく到着します。
ReadUntilClosedObservable2のストリームから繰り返し読み取ると、CPUが消費されるため、ReadUntilClosedObservable1を使用したいと思います。
メッセージを同期順に取得するにはどうすればよいですか?
アップデート:
return Observable.Timer(TimeSpan.Zero, interval, TaskPoolScheduler.Default)
.SelectMany(_ => readToEnd)
.Where(dataChunk => dataChunk.Length > 0);
readToEnd
toが前の仕事を終える前に何度も何度も解雇されることに気づきました。同期する必要はありませんか?問題がある場合Observable.Timer
、それなしで同じ効果を達成するにはどうすればよいですか?間隔を置いて読みますが、待たずに開始しますか?
public static IObservable<int> ReadObservable(this Stream stream, byte[] buffer
,int offset, int count)
{
return stream.ReadAsync(buffer, offset, count)
.ToObservable();
}
public static IObservable<byte[]> ReadObservable(this Stream stream,
int bufferSize)
{
var buffer = new byte[bufferSize];
return stream.ReadObservable(buffer, 0, buffer.Length)
.Select(cbRead =>
{
if (cbRead == 0)
{
return new byte[0];
}
if (cbRead == buffer.Length)
{
return buffer;
}
var dataChunk = new byte[cbRead];
Buffer.BlockCopy(buffer, 0, dataChunk,
0, cbRead);
return dataChunk;
});
}
public static IObservable<byte[]> ReadUntilClosedObservable1(this NetworkStream
stream, int bufferSize, TimeSpan interval)
{
var readToEnd = Observable.Defer(() => stream.ReadObservable(bufferSize))
.DoWhile(() => stream.DataAvailable)
.ToList()
.Select(dataChunks =>
{
var buffer = new List<byte>();
foreach (var dataChunk in dataChunks)
{
buffer.AddRange(dataChunk);
}
return buffer.ToArray();
});
return Observable.Timer(TimeSpan.Zero, interval, TaskPoolScheduler.Default)
.SelectMany(_ => readToEnd)
.Where(dataChunk => dataChunk.Length > 0);
}
public static IObservable<byte[]> ReadUntilClosedObservable2(this Stream stream
,int bufferSize)
{
return Observable.Defer(() => stream.ReadObservable(bufferSize))
.Repeat()
.Where(dataChunk => dataChunk.Length > 0);
}