17

以下をParallel.ForEachに変換するにはどうすればよいですか?

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    foreach (String url in threads)
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }
        progressBar1.PerformStep();
    }
}

非同期処理と並列処理は同じであると想定してコーディングしましたが、そうではないことに気づきました。私はこれについて見つけることができるすべての質問を調べました、そして私は本当に私のためにそれをする例を見つけることができないようです。それらのほとんどは、読み取り可能な変数名を欠いています。何が含まれているのかを説明していない1文字の変数名を使用することは、例を述べるための恐ろしい方法です。

私は通常、threads(フォーラムスレッドへのURLを含む)という名前の配列に300から2000のエントリを持っており、並列処理(多くのHTTPリクエストのため)が実行を高速化するように見えます)。

Parallel.ForEachを使用する前に、すべての非同期を削除する必要がありますか(foreachの外部では非同期はなく、変数定義のみ)?どうすればこれを行うことができますか?メインスレッドをブロックせずにこれを行うことはできますか?

ちなみに私は.NET4.5を使用しています。

4

4 に答える 4

16

非同期処理と並列処理が同じであると仮定してコーディングしました

非同期処理と並列処理はまったく異なります。違いがわからない場合は、まずそれについてもっと読む必要があると思います(たとえば、c#での非同期プログラミングと並列プログラミングの関係は何ですか?)。

さて、あなたがしたいことは実際にはそれほど単純ではありません。なぜなら、特定の程度の並列性で大きなコレクションを非同期的に処理したいからです(8)。同期処理では、Parallel.ForEach()ParallelOptions並列度を構成するために)を使用できますが、で機能する単純な代替手段はありませんasync

コードでは、すべてがUIスレッドで実行されることを期待しているため、これは複雑です。(理想的には、計算からUIに直接アクセスしないでください。代わりに、を使用する必要があります。IProgressこれは、コードをUIスレッドで実行する必要がなくなることを意味します。)

おそらく、.Net 4.5でこれを行う最良の方法は、TPLデータフローを使用することです。それActionBlockはあなたが望むことを正確に行いますが、それは非常に冗長になる可能性があります(あなたが必要とするものよりも柔軟性があるため)。したがって、ヘルパーメソッドを作成することは理にかなっています。

public static Task AsyncParallelForEach<T>(
    IEnumerable<T> source, Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };
    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();
    return block.Completion;
}

あなたの場合、あなたはそれをこのように使うでしょう:

await AsyncParallelForEach(
    threads, async url => await DownloadUrl(url), 8,
    TaskScheduler.FromCurrentSynchronizationContext());

ここで、DownloadUrl()async Task単一のURL(ループの本体)を処理するメソッドであり8、並列度(実際のコードではリテラル定数であってはならない)でありFromCurrentSynchronizationContext()、コードがUIスレッドで実行されることを確認します。

于 2013-02-03T15:47:39.133 に答える
10

Stephen Toubには、の実装に関する優れたブログ投稿がありForEachAsyncます。Svickの答えは、Dataflowが利用可能なプラットフォームに非常に適しています。

TPLのパーティショナーを使用した別の方法を次に示します。

public static Task ForEachAsync<T>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task> body)
{
  var partitions = Partitioner.Create(source).GetPartitions(degreeOfParallelism);
  var tasks = partitions.Select(async partition =>
  {
    using (partition) 
      while (partition.MoveNext()) 
        await body(partition.Current); 
  });
  return Task.WhenAll(tasks);
}

その後、これを次のように使用できます。

public async Task getThreadContentsAsync(String[] threads)
{
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await threads.ForEachAsync(8, async url =>
  {
    HttpResponseMessage response = await client.GetAsync(url);
    String content = await response.Content.ReadAsStringAsync();
    String user;
    foreach (Match match in regex.Matches(content))
    {
      user = match.Groups[1].ToString();
      usernames.TryAdd(user, null);
    }
    progressBar1.PerformStep();
  });
}
于 2013-02-03T22:54:01.147 に答える
3

さらに別の方法は、SemaphoreSlimorを使用することですAsyncSemaphore(これは私のAsyncExライブラリに含まれており、より多くのプラットフォームをサポートしていますSemaphoreSlim):

public async Task getThreadContentsAsync(String[] threads)
{
  SemaphoreSlim semaphore = new SemaphoreSlim(8);
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await Task.WhenAll(threads.Select(async url =>
  {
    await semaphore.WaitAsync();
    try
    {
      HttpResponseMessage response = await client.GetAsync(url);
      String content = await response.Content.ReadAsStringAsync();
      String user;
      foreach (Match match in regex.Matches(content))
      {
        user = match.Groups[1].ToString();
        usernames.TryAdd(user, null);
      }
      progressBar1.PerformStep();
    }
    finally
    {
      semaphore.Release();
    }
  }));
}
于 2013-02-03T23:04:40.563 に答える
0

AsyncEnumerator NuGetパッケージParallelForEachAsyncから拡張メソッドを試すことができます:

using System.Collections.Async;

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    await threads.ParallelForEachAsync(async url =>
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }

        // THIS CALL MUST BE THREAD-SAFE!
        progressBar1.PerformStep();
    },
    maxDegreeOfParallelism: 8);
}
于 2016-08-26T21:25:01.627 に答える