1

データベースからキューを読み取って処理するプログラムを作成する必要があり、すべてのキューは並行して実行され、ConcurrentDictionary を使用して親スレッドで管理されます。キューを表すクラスがあります。このクラスには、キュー情報と親インスタンス ハンドルを受け取るコンストラクターがあります。キュー クラスには、キューを処理するメソッドもあります。

キュー クラスは次のとおりです。

Class MyQueue { 
protected ServiceExecution _parent;
protect string _queueID;

public MyQueue(ServiceExecution parentThread, string queueID)
{
_parent = parentThread;
_queueID = queueID;
}
public void Process()
{
    try
    {
       //Do work to process
    }
    catch()
    {
       //exception handling
    }
    finally{
       _parent.ThreadFinish(_queueID);
    }

親スレッドはキューのデータセットをループし、新しいキュー クラスをインスタンス化します。Queue オブジェクトの Process メソッドを非同期的に実行する新しいスレッドを生成します。このスレッドは ConcurrentDictionary に追加され、次のように開始されます。

private ConcurrentDictionary<string, MyQueue> _runningQueues = new ConcurrentDictionary<string, MyQueue>();

Foreach(datarow dr in QueueDataset.rows)
{
   MyQueue queue = new MyQueue(this, dr["QueueID"].ToString());
   Thread t = new Thread(()=>queue.Process());
   if(_runningQueues.TryAdd(dr["QueueID"].ToString(), queue)
   {
       t.start();
   }
}

//Method that gets called by the queue thread when it finishes
public void ThreadFinish(string queueID)
{
    MyQueue queue;
    _runningQueues.TryRemove(queueID, out queue);
}

これは非同期キュー処理を管理するための正しいアプローチではないと感じています。この設計でデッドロックが発生する可能性があるのではないかと考えています。さらに、タスクを使用して、新しいスレッドの代わりにキューを非同期で実行したいと考えています。前の実行がまだ完了していない場合、同じキューに対して新しいスレッドまたはタスクを生成しないため、キューを追跡する必要があります。このタイプの並列処理を処理する最良の方法は何ですか?

前もって感謝します!

4

1 に答える 1

2

現在のアプローチについて

実際、それは正しいアプローチではありません。データベースから多数のキューを読み取ると、多数のスレッドが生成され、問題が発生する可能性があります。毎回新しいスレッドを作成します。いくつかのスレッドを作成してから再利用することをお勧めします。また、タスクが必要な場合は、タスクを作成LongRunningして再利用することをお勧めします。


提案されたデザイン

次のデザインを提案します。

  1. データベースからキューを読み取り、それらのキューを BlockingCollection に入れるタスクを 1 つだけ予約します。
  2. 複数のLongRunningタスクを開始して、その BlockingCollection からそれぞれキューを読み取り、そのキューを処理します。
  3. タスクが BlockingCollection から取得したキューの処理を完了すると、その BlockingCollection から別のキューを取得します。
  4. CPU のコアを適切に利用するために、これらの処理タスクの数を最適化します。通常、DB のやり取りは遅いため、YMMV のコア数の 3 倍のタスクを作成できます。

デッドロックの可能性

少なくともアプリケーション側では発生しません。ただし、キューはデータベース トランザクションのキューであるため、データベース側でデッドロックが発生する可能性があります。デッドロックが原因でデータベースがトランザクションをロールバックした場合に、タスクがトランザクションを再開するようにロジックを記述する必要がある場合があります。


サンプルコード

private static void TaskDesignedRun()
{
    var expectedParallelQueues = 1024; //Optimize it. I've chosen it randomly
    var parallelProcessingTaskCount = 4 * Environment.ProcessorCount; //Optimize this too.
    var baseProcessorTaskArray = new Task[parallelProcessingTaskCount];
    var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);

    var itemsToProcess = new BlockingCollection<MyQueue>(expectedParallelQueues);

    //Start a new task to populate the "itemsToProcess"
    taskFactory.StartNew(() =>
    {
        // Add code to read queues and add them to itemsToProcess
        Console.WriteLine("Done reading all the queues...");
        // Finally signal that you are done by saying..
        itemsToProcess.CompleteAdding();
    });

    //Initializing the base tasks
    for (var index = 0; index < baseProcessorTaskArray.Length; index++)
    {
        baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
        {
            while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0)           {
                MyQueue q;
                if (!itemsToProcess.TryTake(out q)) continue;
                //Process your queue
            }
         });
     }

     //Now just wait till all queues in your database have been read and processed.
     Task.WaitAll(baseProcessorTaskArray);
}
于 2015-09-24T03:20:48.863 に答える