2

これは、ここでの私の質問のさらに進んだものです

いくつかの読書をすることによって....私はセマフォからスレッドプールに移動しました。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ThreadPoolTest
{
    class Data
    {
        public int Pos { get; set; }
        public int Num { get; set; }
    }

    class Program
    {
        static ManualResetEvent[] resetEvents = new ManualResetEvent[20];

        static void Main(string[] args)
        {            

            int s = 0;
            for (int i = 0; i < 100000; i++)
            {                
                resetEvents[s] = new ManualResetEvent(false);
                Data d = new Data();
                d.Pos = s;
                d.Num = i;
                ThreadPool.QueueUserWorkItem(new WaitCallback(Process), (object)d);
                if (s >= 19)
                {
                    WaitHandle.WaitAll(resetEvents);
                    Console.WriteLine("Press Enter to Move forward");
                    Console.ReadLine();
                    s = 0;
                }
                else
                {
                    s = s + 1;
                }
            }
        }

        private static void Process(object o)
        {
            Data d = (Data) o;
            Console.WriteLine(d.Num.ToString());
            Thread.Sleep(10000);
            resetEvents[d.Pos].Set();
        }
    }
}

このコードは機能し、20個のセットで処理できます。しかし、WaitAllのため、このコードは好きではありません。たとえば、20のバッチを開始し、17が終了するまでに、3つのスレッドに時間がかかるとします。それでも、WaitAllがあるため、17個のスレッドを待機状態のままにします。

WaitAnyは良かったでしょう...しかし、プールを効率的に使用するために、スタック、リスト、キューなどの多くの制御構造を構築する必要があるのはかなり厄介なようです。

私が気に入らないもう1つの点は、resetEventsのクラス内のグローバル変数全体です。この配列は、Processメソッドとメインループの間で共有する必要があるためです。

上記のコードは機能します...しかし、私はそれを改善するためにあなたの助けが必要です。

繰り返しますが...私は.NET2.0VS2008を使用しています。.NET4.0並列/非同期フレームワークを使用できません。

4

2 に答える 2

3

これを行うにはいくつかの方法があります。上記の投稿に基づいて、おそらく最も簡単なのは次のとおりです。

const int MaxThreads = 4;
const int ItemsToProcess = 10000;
private Semaphore _sem = new Semaphore(MaxThreads, MaxThreads);

void DoTheWork()
{
    int s = 0;
    for (int i = 0; i < ItemsToProcess; ++i)
    {
        _sem.WaitOne();
        Data d = new Data();
        d.Pos = s;
        d.Num = i;
        ThreadPool.QueueUserWorkItem(Process, d);
        ++s;
        if (s >= 19)
            s = 0;
    }

    // All items have been assigned threads.
    // Now, acquire the semaphore "MaxThreads" times.
    // When counter reaches that number, we know all threads are done.
    int semCount = 0;
    while (semCount < MaxThreads)
    {
        _sem.WaitOne();
        ++semCount;
    }
    // All items are processed

    // Clear the semaphore for next time.
    _sem.Release(semCount);
}

void Process(object o)
{
    // do the processing ...

    // release the semaphore
    _sem.Release();
}

私の例では4つのスレッドのみを使用しました。これは、コアの数が多いためです。一度に処理できるスレッドが4つしかない場合、20のスレッドを使用することはほとんど意味がありません。MaxThreadsただし、必要に応じて番号を自由に増やすことができます。

于 2013-02-27T20:17:01.700 に答える
2

したがって、これはすべて.NET2.0であると確信しています。

Action私はそれを使用することに慣れているので、定義を開始します。3.5以降でこのソリューションを使用する場合は、その定義を削除してください。

次に、入力に基づいてアクションのキューを作成します。

その後、コールバックを定義します。このコールバックはメソッドの要です。

最初にキュー内の次のアイテムを取得します(キューはスレッドセーフではないため、ロックを使用します)。取得するアイテムがある場合は、そのアイテムを実行します。次に、「それ自体」である新しいアイテムをスレッドプールに追加します。これは再帰的な匿名の方法です(それほど頻繁に使用されることはありません)。これは、コールバックが最初に呼び出されたときに1つのアイテムを実行し、次に別のアイテムを実行するタスクをスケジュールし、そのアイテムが別のアイテムを実行するタスクをスケジュールするということを意味します。最終的にキューがなくなり、それ以上のアイテムのキューイングが停止します。

また、すべてが完了するまでメソッドをブロックする必要があるため、カウンターをインクリメントすることで、これらのコールバックがいくつ終了したかを追跡します。そのカウンターがタスク制限に達すると、イベントを通知します。

最後に、スレッドプールでこれらのコールバックのN個を開始します。

public delegate void Action();
public static void Execute(IEnumerable<Action> actions, int maxConcurrentItems)
{
    object key = new object();
    Queue<Action> queue = new Queue<Action>(actions);
    int count = 0;
    AutoResetEvent whenDone = new AutoResetEvent(false);

    WaitCallback callback = null;
    callback = delegate
    {
        Action action = null;
        lock (key)
        {
            if (queue.Count > 0)
                action = queue.Dequeue();
        }
        if (action != null)
        {
            action();
            ThreadPool.QueueUserWorkItem(callback);
        }
        else
        {
            if (Interlocked.Increment(ref count) == maxConcurrentItems)
                whenDone.Set();
        }

    };

    for (int i = 0; i < maxConcurrentItems; i++)
    {
        ThreadPool.QueueUserWorkItem(callback);
    }

    whenDone.WaitOne();
}

スレッドプールを使用せず、固定数のスレッドのみを使用する別のオプションを次に示します。

public static void Execute(IEnumerable<Action> actions, int maxConcurrentItems)
{
    Thread[] threads = new Thread[maxConcurrentItems];
    object key = new object();
    Queue<Action> queue = new Queue<Action>(actions);
    for (int i = 0; i < maxConcurrentItems; i++)
    {
        threads[i] = new Thread(new ThreadStart(delegate
        {
            Action action = null;
            do
            {
                lock (key)
                {
                    if (queue.Count > 0)
                        action = queue.Dequeue();
                    else 
                        action = null;
                }
                if (action != null)
                {
                    action();
                }
            } while (action != null);
        }));
        threads[i].Start();
    }

    for (int i = 0; i < maxConcurrentItems; i++)
    {
        threads[i].Join();
    }
}
于 2013-02-27T20:23:06.047 に答える