リアクティブ拡張機能を使用して、ストリームで Read メソッドを継続的に呼び出し、結果をオブザーバーに伝達する Observable を作成するにはどうすればよいですか?
それとも、これは物事へのアプローチの完全に間違った方法ですか? 独自の IObservable を実装する必要がありますか?
リアクティブ拡張機能を使用して、ストリームで Read メソッドを継続的に呼び出し、結果をオブザーバーに伝達する Observable を作成するにはどうすればよいですか?
それとも、これは物事へのアプローチの完全に間違った方法ですか? 独自の IObservable を実装する必要がありますか?
自分のオブザーバブルを実装することが理にかなっている状況に遭遇したことはありません。
代わりにこれを試してください:
public static IObservable<byte[]> ObservableRead(Stream stream, int bufferSize)
{
return Observable.Create<byte[]>(o =>
{
var buffer = new byte[bufferSize];
var read = 0;
try
{
while (true)
{
read = stream.Read(buffer, 0, buffer.Length);
if (read == 0)
{
break;
}
var results = buffer.Take(read).ToArray();
//Always return a copy
//never the buffer for concurrency's sake.
o.OnNext(results);
}
}
catch (Exception ex)
{
o.OnError(ex);
}
finally
{
o.OnCompleted();
}
return Disposable.Empty;
});
}