5

タイプのシーケンスIObservable<T>と、にマップする関数がT, CancellationTokenありますTask<U>IObservable<U>それらから抜け出す最もクリーンな方法は何ですか?

次のセマンティクスが必要です。

  • 各タスクは、前のアイテムのタスクが終了した後に開始されます
  • タスクがキャンセルまたは失敗した場合、そのタスクはスキップされます
  • 元のシーケンスの順序は厳密に保持されます

私が見たときの署名は次のとおりです。

public static IObservable<U> Select<T, U> (
    this IObservable<T> source,
    Func<T, CancellationToken, Task<U>> selector
);

私はまだコードを書いていませんが、誰かに負けない限り書きます。
いずれにせよ、私は のような演算子に慣れていないWindowため、私のソリューションはあまり洗練されていない可能性があります。

C# 4 でのソリューションが必要ですが、比較のために C# 5 の回答も歓迎します。


興味があれば、以下は多かれ少なかれ私の現実世界のシナリオです。

Dropbox.GetImagesRecursively ()
    .ObserveOn (SynchronizationContext.Current)
    .Select (DownloadImage)
    .Subscribe (AddImageToFilePicker);
4

1 に答える 1

3

これはこれまでのところうまくいくようです:

public static IObservable<U> Select<T, U> (
    this IObservable<T> source,
    Func<T, CancellationToken, Task<U>> selector)
{
    return source
        .Select (item => 
            Observable.Defer (() => 
                Observable.StartAsync (ct => selector (item, ct))
                    .Catch (Observable.Empty<U> ())
            ))
        .Concat ();
}

延期されたタスクベースの例外を飲み込むオブザーバブルを各アイテムにマップし、それらを連結します。


私の思考回路はこんな感じでした。

SelectManyオーバーロードの 1 つが、私が望んでいたことをほぼ正確に実行し、まったく同じ署名を持っていることに気付きました。しかし、それは私のニーズを満たしていませんでした:

  • 元のアイテムが出現するとタスクが作成されますが、各タスクが完了するまで待つ必要がありました
  • キャンセルされたタスクや失敗したタスクをスキップするオプションはありません

このオーバーロードの実装を調べたところ、FromAsyncタスクの作成とキャンセルを処理するために使用されていることに気付きました。

public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult> (IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
    return SelectMany_<TSource, TTaskResult, TResult> (
        source,
        x => FromAsync (ct => taskSelector (x, ct)),
        resultSelector
    );
}

私はFromAsyncそれがどのように実装されているかを確認するために目を向けました。

public virtual IObservable<TResult> FromAsync<TResult> (Func<CancellationToken, Task<TResult>> functionAsync)
{
    return Defer (() => StartAsync (functionAsync));
}

Deferとを再利用しましたが、飲み込みエラーStartAsyncも追加しました。Catchとの組み合わせによりDeferConcatタスクが互いに待機し、元の順序で開始されるようになります。

于 2013-07-12T16:02:38.373 に答える