6

一部のデータをバッファリングするために Reactive Extensions (Rx) を使用しています。ただし、このデータと非同期で何かを行う必要があるという点で問題がありますが、非同期操作が完了するまでバッファーが次のグループを通過させたくないのです。

コードを 2 つの方法で構成しようとしました (不自然な例):

public async Task processFiles<File>(IEnumerable<File> files)
{
    await files.ToObservable()
        .Buffer(10)
        .SelectMany(fi => fi.Select(f => upload(f)) //Now have an IObservable<Task>
        .Select(t => t.ToObservable())
        .Merge()
        .LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

また

public async Task processFiles<File>(IEnumerable<File> files)
{
    var buffered = files.ToObservable()
        .Buffer(10);

    buffered.Subscribe(async files => await Task.WhenAll(files.Select(f => upload(f)));

    await buffered.LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

残念ながら、非同期操作が完了する前にバッファーが次のグループをプッシュするため、これらの方法はどちらも機能しませんでした。その意図は、バッファリングされた各グループを非同期的に実行し、その操作が完了したときにのみ、次のバッファリングされたグループに進むことです。

どんな助けでも大歓迎です。

4

2 に答える 2

2

まず、各グループの項目を並行して実行する必要があると思いますが、各グループが連続して実行されるのは非常に珍しいことです。より一般的な要件は、項目を並行して実行することですが、同時に最大で n 個の項目を実行することです。この方法では、グループが固定されていないため、1 つのアイテムに時間がかかりすぎても、他のアイテムはそれを待つ必要がありません。

あなたが求めていることを行うには、TPL Dataflow が Rx よりも適していると思います (ただし、一部の Rx コードは引き続き役立ちます)。TPL Dataflow は、デフォルトで一連の処理を実行する「ブロック」を中心としています。これはまさに必要なものです。

コードは次のようになります。

public static class Extensions
{
    public static Task ExecuteInGroupsAsync<T>(
         this IEnumerable<T> source, Func<T, Task> func, int groupSize)
     {
         var block = new ActionBlock<IEnumerable<T>>(
             g => Task.WhenAll(g.Select(func)));
         source.ToObservable()
               .Buffer(groupSize)
               .Subscribe(block.AsObserver());
         return block.Completion;
     }
}

public Task ProcessFiles(IEnumerable<File> files)
{
    return files.ExecuteInGroupsAsync(Upload, 10);
}

これにより、面倒な作業のほとんどがActionBlockRx に残されます。データフロー ブロックは Rx オブザーバー (およびオブザーバブル) として機能できるため、それを利用して を使い続けることができますBuffer()

グループ全体を一度に処理したいので、 を使用して、グループ全体が完了したときに完了するTask.WhenAll()を作成します。Taskデータフロー ブロックはTask-return 関数を理解するため、次のグループはTask、前のグループによって返された関数が完了するまで実行を開始しません。

最終結果は でありCompletion Task、ソース オブザーバブルが完了し、すべての処理が完了すると完了します。

TPL Dataflow にも がありBatchBlock、これは同様に機能し、コレクションから各項目をBuffer()直接( and を使用せずに) 使用できますが、コードのこの部分に Rx を使用すると、より簡単になると思います。Post()ToObservable()AsObserver()

編集:実際には、ここでは TPL データフローはまったく必要ありません。James Worldが提案したように使用ToEnumerable()するだけで十分です:

public static async Task ExecuteInGroupsAsync<T>(
     this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.ToObservable().Buffer(groupSize).ToEnumerable();
    foreach (var g in groups)
    {
        await Task.WhenAll(g.Select(func));
    }
}

あるいは、Rx を使わずに morelinq を使ってさらにBatch()シンプル:

public static async Task ExecuteInGroupsAsync<T>(
    this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.Batch(groupSize);
    foreach (var group in groups)
    {
        await Task.WhenAll(group.Select(func));
    }
}
于 2013-06-13T12:32:29.450 に答える