28

Rx サブスクリプション内で非同期関数をコールバックしたいと考えています。

例:

public class Consumer
{
    private readonly Service _service = new Service();

    public ReplaySubject<string> Results = new ReplaySubject<string>();

    public void Trigger()
    {
        Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
    }

    public Task RunAsync()
    {
        return _service.DoAsync();
    }
}

public class Service
{
    public async Task<string> DoAsync()
    {
        return await Task.Run(() => Do());
    }

    private static string Do()
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(200));
        throw new ArgumentException("invalid!");
        return "foobar";
    }
}

[Test]
public async Task Test()
{
    var sut = new Consumer();
    sut.Trigger();
    var result = await sut.Results.FirstAsync();
}

例外を適切にキャッチするには、何をする必要がありますか?

4

3 に答える 3

39

アナ・ベッツの答えはほとんどのシナリオで機能しますが、非同期関数が完了するのを待っている間にストリームをブロックしたい場合は、次のようなものが必要です:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(l => Observable.FromAsync(asyncMethod))
          .Concat()
          .Subscribe();

または:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
          .Concat()
          .Subscribe();
于 2015-05-04T12:45:59.243 に答える
21

これを次のように変更します。

Observable.Timer(TimeSpan.FromMilliseconds(100))
    .SelectMany(async _ => await RunAsync())
    .Subscribe();

サブスクライブは、Observable 内で非同期操作を保持しません。

于 2014-04-11T16:45:54.690 に答える
18

メソッドを作成するため、 にasyncメソッドを渡したくありません。避けるために最善を尽くしてください。Subscribeasync voidasync void

あなたの場合、シーケンスの各要素に対してメソッドを呼び出し、すべての結果をキャッシュすることが必要だと思います。asyncその場合、使用して各要素SelectManyのメソッドを呼び出し、キャッシュします (さらにボールを転がすために a を使用します)。asyncReplayConnect

public class Consumer
{
    private readonly Service _service = new Service();

    public IObservable<string> Trigger()
    {
        var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
            .SelectMany(_ => RunAsync())
            .Replay();
        connectible.Connect();
        return connectible;
    }

    public Task<string> RunAsync()
    {
        return _service.DoAsync();
    }
}

Results代わりにメソッドから返されるようにプロパティを変更しましたTrigger。これはより理にかなっていると思うので、テストは次のようになります。

[Test]
public async Task Test()
{
    var sut = new Consumer();
    var results = sut.Trigger();
    var result = await results.FirstAsync();
}
于 2014-04-11T11:15:30.760 に答える