async
ストリームを読み取り、何かが見つかったときにイベントを発生させる、長時間実行されるメソッドであるメソッドがあります。
public static async void GetStream(int id, CancellationToken token)
新しいタスクで作成されるため、キャンセル トークンが必要です。内部的にはawait
、ストリームを読み取るときに呼び出します:
var result = await sr.ReadLineAsync()
ここで、これを IObservable<> を返すメソッドに変換して、これをリアクティブ拡張で使用できるようにします。私が読んだことから、これを行う最善の方法は を使用することですObservable.Create
.RX 2.0は非同期もサポートするようになったので、次のようなものですべてを動作させることができます:
public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
return Observable.Create<Message>(
async (IObserver<Message> observer) =>
{
内部の残りのコードは同じですが、イベントを発生させる代わりにobserver.OnNext()
. しかし、これは間違っているように感じます。1つには、CancellationTokensをそこに混ぜています.asyncキーワードを追加すると機能しましたが、これは実際に行うのが最善ですか? ObservableStream を次のように呼び出しています。
Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));