3

処理するバッチのリストがあります。永遠に。
各チャンク (5) を並行して実行し、完了したら次のチャンクに移動します。
何らかの理由で、次のコードはチャンクが完了するのを待機せず、完了していなくても続行します。

while (true)
{
    foreach (string[] urlsArr in chunks)
    { 
        int i = 0;
        foreach (var url in urlsArr)
        {
            ThreadPool.QueueUserWorkItem(x =>
            {
                ProccessUrl(url, config, drivers[i]);
                _resetEvent.Set();
                i++;
            });
        }
        _resetEvent.WaitOne();// this is not really waiting.
    }
}
4

3 に答える 3

1

Semaphoreまたはそれのスリムバージョンをご覧ください。セマフォを使用すると、常に 5 つのスレッドのみを実行できます。実行中のスレッドのいずれかが終了すると、新しい作業を開始できます。これは、特にワークロードが不均一な場合に、より効率的です。1 つのアイテムの処理に 1 時間かかり、他の 4 つのアイテムの処理に 1 秒かかる状況を考えてみましょう。この場合、4 つのスレッドは最後のスレッドが終了するのを待ってから、他の作業を開始します。

例については、「SemaphoreSlim の使用法を理解する必要がある」を参照してください。

コードの問題は、待機ハンドルが 1 つとスレッドが 5 つしかないことです。実行中の 5 つのスレッドのいずれかが作業を終了すると、待機ハンドルが設定されるため、外側のループが続行され、別の 5 つのスレッドが開始されます。ここまでで、内側のループの最初の 4 つのスレッドが完了し、それらのいずれかが待機ハンドルを再度設定できる可能性があります。さて、ここに問題があると思いますか?

Hans のコメントによると、単一のバッチ内の作業項目間に依存関係がある場合、次のバッチを開始する前にすべての作業項目を完了する必要がある場合は、以下を確認する必要がありますCountDownEvent

于 2015-04-06T09:54:37.587 に答える
1

これはタスク(非同期/待機)を含むバージョンです

while (true)
        {
            foreach (string[] urlsArr in chunks)
            {
                Task[] tasks = new Task[urlsArr.Length];
                for (int i = 0; i < urlsArr.Length; i++)
                {
                    var url = urlsArr[i];
                    var driver = drivers[i];
                    tasks[i] = Task.Run(() => { ProccessUrl(url, config, driver); });
                }

                await Task.WhenAll(tasks);
            }
        }

スレッドセーフな方法でインクリメントされなかった元のコードの「i」変数の問題も修正されることに注意してください(Interlocked.Incrementで修正できます)。

コードがそうでないasync場合は、タスクがスレッドで完了するのを待つことができます (ただし、これはブロックされています)。

Task.WhenAll(tasks).Wait();
于 2015-04-06T10:58:47.310 に答える
0

Parallel.ForEach()おそらく全体を単純化し、スレッドの管理と同時実行度を 5 に制限することの両方に活用できると思います。

次のサンプル コードを実行すると、同時スレッドの数が 5 に制限されているため、偽の URL が 5 つのチャンクで処理されていることがわかります。

このようにすれば、独自のチャンク ロジックは必要ありません。

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main()
        {
            // Make some pretend URLs for this demo.

            string[] urls = Enumerable.Range(1, 100).Select(n => n.ToString()).ToArray();

            // Use Parallel.ForEach() along with MaxDegreeOfParallelism = 5 to process
            // these using 5 threads maximum:

            Parallel.ForEach(
                urls,
                new ParallelOptions{MaxDegreeOfParallelism = 5},
                processUrl
            );
        }

        static void processUrl(string url)
        {
            Console.WriteLine("Processing " + url);
            Thread.Sleep(1000);
            Console.WriteLine("Processed " + url);
        }
    }
}
于 2015-04-06T10:21:27.437 に答える