8

asyncストリームを読み取り、何かが見つかったときにイベントを発生させる、長時間実行されるメソッドであるメソッドがあります。

public static async void GetStream(int id, CancellationToken token)

新しいタスクで作成されるため、キャンセル トークンが必要です。内部的にはawait、ストリームを読み取るときに呼び出します:

var result = await sr.ReadLineAsync()

ここで、これを IObservable<> を返すメソッドに変換して、これをリアクティブ拡張で使用できるようにします。私が読んだことから、これを行う最善の方法は を使用することですObservable.Create.RX 2.0は非同期もサポートするようになったので、次のようなものですべてを動作させることができます:

public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
    return Observable.Create<Message>(
        async (IObserver<Message> observer) =>
            {

内部の残りのコードは同じですが、イベントを発生させる代わりにobserver.OnNext(). しかし、これは間違っているように感じます。1つには、CancellationTokensをそこに混ぜています.asyncキーワードを追加すると機能しましたが、これは実際に行うのが最善ですか? ObservableStream を次のように呼び出しています。

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));
4

2 に答える 2

1

void ではなく Task を返すように GetStream を変更する必要があります (svick がコメントしたように、絶対に必要な場合を除いて、async void を返すことは適切ではありません)。Task を返したら、.ToObservable() を呼び出すだけで完了です。

例えば:

public static async Task<int> GetStream(int id, CancellationToken token) { ... }

それで、

GetStream(1, new CancellationToken(false))
   .ToObservable()
   .Subscribe(Console.Write);
于 2013-07-04T14:10:02.697 に答える