Observable.FromAsyncPattern を使用して、BeginX EndX スタイルの非同期メソッドからオブザーバブルを作成できます。
たぶん私は物事を誤解していますが、新しい非同期スタイルのメソッドからオブザーバブルを作成する同様の機能がありますか?つまり、Stream.ReadAsync?
Observable.FromAsyncPattern を使用して、BeginX EndX スタイルの非同期メソッドからオブザーバブルを作成できます。
たぶん私は物事を誤解していますが、新しい非同期スタイルのメソッドからオブザーバブルを作成する同様の機能がありますか?つまり、Stream.ReadAsync?
ToObservableを使用してIObservable<T>からを作成できます。Task<T>
using System.Reactive.Threading.Tasks;
Stream s = ...;
IObservable<int> o = s.ReadAsync(buffer, offset, count).ToObservable();
Leeの答えは正しいことに注意してください。しかし、最終的に私がやったことは、Observable.Createを使用してストリームから継続的に読み取ることでした。以下を参照してください-
    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {
       return Observable.Create<Command>(async (subject, token) =>
        {
            try
            {
                while (true)
                {
                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }
                    Command cmd = await reader.ReadCommandAsync();
                    subject.OnNext(cmd);
                }
            }
            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }
        }).Publish();
    }