3

スレッドプールを使用して同時タスクを実行するマルチスレッドライブラリを実装しようとしています。基本的に、受信した収集パラメーターからスレッドプールにタスクを追加し、処理中の最後のタスクがパルス信号を送信するまで待機します。以前のテストでは成功しましたが、処理が非常に短いタスクでテストしたいときに、奇妙な問題が発生しました。どういうわけか、メインスレッドで待機コマンドが実行される前にパルス信号が送信されるか、同期の努力にもかかわらず単純に見ることができない何かが起こっています。

問題を解決するために、現在はうまく機能している潜在的なパフォーマンス上の利点のために、別の「あまり望ましくない」ソリューションを実装しましたが、このような場合に最初のアプローチが機能しない理由を知りたいと思いました。そもそも、パフォーマンスに関しては、この2つの間に大きな違いはありません。

説明のために、以下のプロセスを簡略化した後、両方のソリューションを追加しています。誰かが私に何が悪いのかを指摘するのを手伝ってもらえますか?

前もって感謝します。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;

namespace TestcodeBenchmark
{
    class Program
    {
        static int remainingTasks = 10000000;
        static Stopwatch casioF91W = new Stopwatch();
        static Random rg = new Random();
        static readonly object waitObject = new object();


        static void Main(string[] args)
        {
            TestLoop(30, remainingTasks);
            Console.ReadKey();
        }

        private static void TestLoop(int loopCount, int remainingCountResetNumber)
        {
            for (int i = 0; i < loopCount; i++)
            {
                remainingTasks = remainingCountResetNumber;
                //When this method is called it eventualy stuck at Monitor.Wait line
                TestInterlocked();

                remainingTasks = remainingCountResetNumber;
                //When this method is called it processes stuff w/o any issues.
                TestManualLock();
                Console.WriteLine();
            }
        }

        private static void TestInterlocked()
        {
            casioF91W.Restart();
            //for (int i = 0; i < remainingTasks; i++)
            //{
            //    ThreadPool.QueueUserWorkItem(delegate { TestInterlockedDecrement(); });
            //}
            int toStart = remainingTasks;
            //for (int i = 0; i < remainingTasks; i++)
            for (int i = 0; i < toStart; i++)
            {
                if (!ThreadPool.QueueUserWorkItem(delegate { TestInterlockedDecrement(); }))
                    Console.WriteLine("Queue failed");
            }
            //lock waitObject to be able to call Monitor.Wait
            lock (waitObject)
            {
                //if waitObject is locked then no worker thread should be able to send a pulse signal
                //however, if pulse signal was sent before locking here remainingTasks should be
                //zero so don't wait if all tasks are processed already
                if (remainingTasks != 0)
                {
                    //release the lock on waitObject and wait pulse signal from the worker thread that 
                    //finishes last task
                    Monitor.Wait(waitObject);
                }
            }
            casioF91W.Stop();
            Console.Write("Interlocked:{0}ms ", casioF91W.ElapsedMilliseconds);
        }

        private static void TestInterlockedDecrement()
        {
            //process task
            //TestWork();
            //Once processing finishes decrement 1 from remainingTasks using Interlocked.Decrement
            //to make sure it is atomic and therefore thread safe. If resulting value is zero
            //send pulse signal to wake main thread.            
            if (Interlocked.Decrement(ref remainingTasks) == 0)
            {
                //Acquire a lock on waitObject to be able to send pulse signal to main thread. If main 
                //thread acquired the lock earlier, this will wait until main thread releases it
                lock (waitObject)
                {
                    //send a pulse signal to main thread to continue
                    Monitor.PulseAll(waitObject);
                }
            }
        }

        private static void TestManualLock()
        {
            casioF91W.Restart();

            //Acquire the lock on waitObject and don't release it until all items are added and
            //Wait method is called. This will ensure wait method is called in main thread
            //before any worker thread can send pulse signal by requiring worker threads to
            //lock waitObject to be able to modify remainingTasks            
            lock (waitObject)
            {
                for (int i = 0; i < remainingTasks; i++)
                {
                    ThreadPool.QueueUserWorkItem(delegate { TestManualDecrement(); });
                }
                Monitor.Wait(waitObject);
            }
            casioF91W.Stop();
            Console.Write("ManualLock:{0}ms ", casioF91W.ElapsedMilliseconds);
        }

        private static void TestManualDecrement()
        {
            //TestWork();
            //try to acquire lock on wait object.
            lock (waitObject)
            {
                //if lock is acquired, decrement remaining tasks by and then check
                //whether resulting value is zero.
                if (--remainingTasks == 0)
                {
                    //send a pulse signal to main thread to continue
                    Monitor.PulseAll(waitObject);
                }
            }
        }

        private static void TestWork()
        {
            //Uncomment following to simulate some work.
            //int i = rg.Next(100, 110);
            //for (int j = 0; j < i; j++)
            //{

            //}
        }
    }
}
4

1 に答える 1

3

タスクを開始すると、ループがタスクを開始しますremainingTasks。ただし、10000に近づくまでに、一部のタスクが完了し、この数が10000未満に減少したため、適切な数のタスクを開始できません。ループを変更して、開始する必要のあるタスクの数を保存すると、コードは正常に実行されます。(の戻り値も確認する必要があることに注意してくださいQueueUserWorkItem。)

        int toStart = remainingTasks;
        for (int i = 0; i < toStart; i++)
        {
            if (!ThreadPool.QueueUserWorkItem(delegate { TestInterlockedDecrement(); }))
                Console.WriteLine("Queue failed");
        }
于 2012-12-27T22:57:52.630 に答える