Observable.FromAsyncPattern を使用して、BeginX EndX スタイルの非同期メソッドからオブザーバブルを作成できます。
たぶん私は物事を誤解していますが、新しい非同期スタイルのメソッドからオブザーバブルを作成する同様の機能がありますか?つまり、Stream.ReadAsync?
Observable.FromAsyncPattern を使用して、BeginX EndX スタイルの非同期メソッドからオブザーバブルを作成できます。
たぶん私は物事を誤解していますが、新しい非同期スタイルのメソッドからオブザーバブルを作成する同様の機能がありますか?つまり、Stream.ReadAsync?
ToObservableを使用してIObservable<T>
からを作成できます。Task<T>
using System.Reactive.Threading.Tasks;
Stream s = ...;
IObservable<int> o = s.ReadAsync(buffer, offset, count).ToObservable();
Leeの答えは正しいことに注意してください。しかし、最終的に私がやったことは、Observable.Createを使用してストリームから継続的に読み取ることでした。以下を参照してください-
public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
{
return Observable.Create<Command>(async (subject, token) =>
{
try
{
while (true)
{
if (token.IsCancellationRequested)
{
subject.OnCompleted();
return;
}
Command cmd = await reader.ReadCommandAsync();
subject.OnNext(cmd);
}
}
catch (Exception ex)
{
try
{
subject.OnError(ex);
}
catch (Exception)
{
Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
throw;
}
}
}).Publish();
}