223

メトロアプリでは、いくつかのWCF呼び出しを実行する必要があります。かなりの数の呼び出しが行われる必要があるため、並列ループで呼び出す必要があります。問題は、WCF呼び出しがすべて完了する前に並列ループが終了することです。

期待どおりに機能するように、これをどのようにリファクタリングしますか?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();
4

11 に答える 11

196

背後にある全体的な考え方Parallel.ForEach()は、一連のスレッドがあり、各スレッドがコレクションの一部を処理するということです。お気づきのとおり、これはasync-awaitでは機能しません。非同期呼び出しの期間中、スレッドを解放する必要があります。

スレッドをブロックすることでそれを「修正」することもできますが、それは-ForEach()の全体的なポイントを打ち負かします。asyncawait

できることは、非同期を十分にサポートするの代わりにTPLデータフローを使用することです。Parallel.ForEach()Task

具体的には、ラムダを使用してTransformBlock各IDをに変換するを使用してコードを記述できます。このブロックは、並行して実行するように構成できます。そのブロックを、それぞれをコンソールに書き込むブロックにリンクします。ブロックネットワークを設定した後、各IDをに設定できます。CustomerasyncActionBlockCustomerPost()TransformBlock

コード内:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

TransformBlockおそらく、の並列処理をいくつかの小さな定数に制限したいでしょうが。また、コレクションが大きすぎる場合など、の容量を制限し、TransformBlockを使用して非同期でアイテムを追加することもできます。SendAsync()

コードと比較した場合の追加の利点(機能した場合)は、1つの項目が終了するとすぐに書き込みが開始され、すべての処理が終了するまで待たないことです。

于 2012-07-19T16:32:41.523 に答える
146

svickの答えは(いつものように)素晴らしいです。

ただし、実際に転送するデータが大量にある場合は、Dataflowの方が便利です。asyncまたは、互換性のあるキューが必要な場合。

あなたの場合、より簡単な解決策は、asyncスタイルの並列処理を使用することです。

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
于 2012-07-19T16:47:05.793 に答える
96

svickが提案したようにDataFlowを使用するのはやり過ぎかもしれません。また、Stephenの答えは、操作の並行性を制御する手段を提供していません。ただし、それはかなり簡単に達成できます。

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

リストのToArray()代わりに配列を使用し、完了したタスクを置き換えることで呼び出しを最適化できますが、ほとんどのシナリオで大きな違いが生じるとは思えません。OPの質問ごとの使用例:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

EDITFellowSOユーザーとTPLwizEli Arbelが、StephenToubの関連記事を教えてくれました。いつものように、彼の実装はエレガントで効率的です。

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });
                      
        })); 
}
于 2014-09-16T19:37:14.860 に答える
52

質問が最初に投稿された4年前には存在しなかった新しいAsyncEnumeratorNuGetパッケージを使用すると、労力を節約できます。これにより、並列度を制御できます。

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

免責事項:私はオープンソースでMITの下でライセンスされているAsyncEnumeratorライブラリの作成者であり、コミュニティを支援するためだけにこのメッセージを投稿しています。

于 2017-06-19T20:28:54.000 に答える
17

をにラップしParallel.Foreach、キーワードTask.Run()の代わりに使用しますawait[yourasyncmethod].Result

(UIスレッドをブロックしないようにTask.Runを実行する必要があります)

このようなもの:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;
于 2014-11-18T11:55:43.480 に答える
8

これは非常に効率的で、TPLデータフロー全体を機能させるよりも簡単です。

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}
于 2014-12-05T21:48:55.777 に答える
8

SemaphoreSlimを利用し、最大の並列度を設定できるこの拡張方法

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

使用例:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
于 2018-05-09T22:46:16.807 に答える
6

私はパーティーに少し遅れていますが、GetAwaiter.GetResult()を使用して、同期コンテキストで非同期コードを実行することを検討してください。

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});
于 2016-11-30T16:30:38.583 に答える
5

一連のヘルパーメソッドを導入すると、次の単純な構文で並列クエリを実行できるようになります。

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

ここで何が起こるかというと、ソースコレクションを10個のチャンクに分割し(.Split(DegreeOfParallelism))、次に10個のタスクを実行してそれぞれのアイテムを1つずつ処理し(.SelectManyAsync(...))、それらを1つのリストにマージします。

より簡単なアプローチがあることを言及する価値があります:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

ただし、注意が必要です。ソースコレクションが大きすぎる場合は、Taskすべてのアイテムに対してすぐにスケジュールが設定されるため、パフォーマンスが大幅に低下する可能性があります。

上記の例で使用されている拡張メソッドは、次のようになります。

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}
于 2017-11-17T15:38:22.813 に答える
2

.NET 6アップデート:Parallel.ForEachAsync APIの導入後、以下の実装は関連しなくなりました。これらは、.NET6より古いバージョンの.NETプラットフォームを対象としているプロジェクトにのみ役立ちます。


これは、TPL DataflowライブラリのForEachAsyncに基づいた、メソッドの単純な一般的な実装であり、現在は.NET5プラットフォームに組み込まれています。ActionBlock

public static Task ForEachAsync<T>(this IEnumerable<T> source,
    Func<T, Task> action, int dop)
{
    // Arguments validation omitted
    var block = new ActionBlock<T>(action,
        new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = dop });
    try
    {
        foreach (var item in source) block.Post(item);
        block.Complete();
    }
    catch (Exception ex) { ((IDataflowBlock)block).Fault(ex); }
    return block.Completion;
}

このソリューションは、提供されたを熱心に列挙IEnumerableし、そのすべての要素をすぐにに送信しますActionBlock。そのため、要素数が非常に多い列挙型にはあまり適していません。以下は、ソースを遅延的に列挙し、その要素をActionBlock1つずつ送信する、より洗練されたアプローチです。

public static async Task ForEachAsync<T>(this IEnumerable<T> source,
    Func<T, Task> action, int dop)
{
    // Arguments validation omitted
    var block = new ActionBlock<T>(action, new ExecutionDataflowBlockOptions()
    { MaxDegreeOfParallelism = dop, BoundedCapacity = dop });
    try
    {
        foreach (var item in source)
            if (!await block.SendAsync(item).ConfigureAwait(false)) break;
        block.Complete();
    }
    catch (Exception ex) { ((IDataflowBlock)block).Fault(ex); }
    try { await block.Completion.ConfigureAwait(false); }
    catch { block.Completion.Wait(); } // Propagate AggregateException
}

これらの2つのメソッドは、例外の場合の動作が異なります。AggregateException最初の¹は、例外を含むをそのInnerExceptionsプロパティに直接伝播します。AggregateException2つ目は、例外を除いて別のものを含むを伝播しAggregateExceptionます。個人的には、2番目の方法の動作は、実際にはより便利だと思います。これを待つと、ネストのレベルが自動的に排除されるため、ブロック内を簡単catch (AggregateException aex)に処理できます。最初の方法では、ブロック内にアクセスできるように、待機する前にを保存する必要があります。非同期メソッドからの例外の伝播の詳細については、こちらまたはこちらをご覧ください。aex.InnerExceptionscatchTasktask.Exception.InnerExceptionscatch

どちらの実装も、の列挙中に発生する可能性のあるエラーを適切に処理しますsourceForEachAsync保留中のすべての操作が完了する前に、メソッドは完了しません。監視されないままのタスクはありません(ファイアアンドフォーゲット方式で)。

¹最初の実装では、非同期と待機が不要になります。

于 2020-12-11T13:07:35.913 に答える
-1

TPLなしの簡単なネイティブの方法:

int totalThreads = 0; int maxThreads = 3;

foreach (var item in YouList)
{
    while (totalThreads >= maxThreads) await Task.Delay(500);
    Interlocked.Increment(ref totalThreads);

    MyAsyncTask(item).ContinueWith((res) => Interlocked.Decrement(ref totalThreads));
}

次のタスクでこのソリューションを確認できます。

async static Task MyAsyncTask(string item)
{
    await Task.Delay(2500);
    Console.WriteLine(item);
}
于 2021-05-09T21:53:36.160 に答える