7

Reactive Extensions (Rx) を使用して、完了したタスクの列挙をバッファリングしようとしています。これを行うためのクリーンな組み込みの方法があるかどうかは誰にもわかりませんか? ToObservable拡張メソッドは、私が望むものではなく、後で使用できる を作成するIObservable<Task<T>>だけIObservable<T>ですBuffer

考案された例:

//Method designed to be awaitable
public static Task<int> makeInt()
{
     return Task.Run(() => 5);
}

//In practice, however, I don't want to await each individual task
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer 
public static void Main() 
{
    //Make a bunch of tasks
    IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt());

    //Is there a built in way to turn this into an Observable that I can then buffer?
    IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //????

    buffered.Subscribe(ints => {
        Console.WriteLine(ints.Count()); //Should be 15
    });
}
4

1 に答える 1

8

の別のオーバーロードをTask使用して観察可能に変換できるという事実を使用できます。ToObservable()

(単一項目の) オブザーバブルのコレクションがある場合、 を使用して完了した項目を含む単一のオブザーバブルを作成できますMerge()

したがって、コードは次のようになります。

futureInts.Select(t => t.ToObservable())
          .Merge()
          .Buffer(15)
          .Subscribe(ints => Console.WriteLine(ints.Count));
于 2013-06-11T18:34:50.150 に答える