0

私のシステムには、長時間実行されているがCPUにバインドされていない「ジョブ」がたくさんあります。これらを処理するためにワーカーロールを設定したいのですが、1つのワーカーロールで10〜20のスレッドをすべて同時に処理することができるように、十分にスケーラブルです。

ここにTPLの使用を提案する質問がありますが、これについては限られた経験しかありません。ただし、スレッドの数が最大になるようにスレッドを管理する方法と、スレッドが解放されたときにスレッドをディスパッチする方法がわかりません。

これを少し複雑にしているのは、各「ジョブ」が必要とするサービスを作成するためにNinjectを使用したいということです。

これが私の頭の中で機能していることをイメージする方法です:

while (true)
{
    // Don't go unless we have a free slot (how do I implement this?!)
    if (FreeThreadExists)
    {
        // Get the next message
        CloudQueueMessage ThisMessage = Queue.GetMessage(TimeSpan.FromMinutes(3));

        // Get the new job and inject services
        Job MyJob = Kernel.Get<Job>();

        // Start this job
        // Will I need to keep ahold of this Task?
        // And how do I know when it's done so that FreeThreadExists changes?
        Task.Factory.StartNew(() => MyJob.Run(ThisMessage));
    }
    else
    {    
        // Sleep to prevent choking
        Thread.Sleep(500);
    }
}

次に、そのスレッドで、完了時にメッセージを削除します。基本的に、Azureの機能をあまり失うことなく、1つのワーカーを20の「インスタンス」に分割しようとしています(具体的には、キューメッセージのタイムアウト/再試行機能が必要です)。

私は.NETスレッドにかなり不慣れですが、これを実行するための最良の方法は何ですか?

編集:うわー、私は重要なポイントを追加するのを完全に忘れました:これは複数のワーカーにまたがってスケーリングする必要があります。したがって、それぞれ10個のスレッドを持つ10個のワーカーロールのメッセージは、UIフロントエンドによってキューに入れられ、次にキューから取り出されて、空きスレッドを持つ最初のワーカーによって実行されます。

4

2 に答える 2

1

あなたがあなたのコードで述べたように、私は通常TPLも含みます。Gauravのアプローチの代替として、以下のコードを参照してください。以下の擬似コードは、スレッドの作成を制御するParallel.Forを使用しています。各スレッドは無限ループを開始して実行します。仕事がない場合は、少し寝てください。

// Start 10 threads
Parallel.For(0, 10, (i) =>
{
  while (true)
  {
    // Get message from queue
    var msg = Queue.GetMessage();
    if (msg != null)
    {
      // Do some work here...
      StartSomeJob();

      // Then when you are done, delete the message
      Queue.DeleteMessage(msg);
    }
    // Wait 1 second before fetching next work item from queue
    System.Threading.Thread.Sleep(1000);
  }
});
于 2012-10-23T23:20:19.447 に答える
1

この擬似コードを試してください:

while (true)
{
    int maxThreadsPerWorkerRole = 3;//assuming each worker role can handle 3 jobs simultaneously
    var messages = Queue.GetMessages(3);//Get 3 messages from the queue
    if (messages != null && messages.Count > 0)//Ensuring there is some work which needs to be done
    {
        var myTasks = new List<Task>();
        for (int i=0; i<messages.Count; i++)
        {
            Job MyJob = Kernel.Get<Job>();//Get the job
            var task = Task.Factory.StartNew(() => MyJob.Run(messages[i])); 
            myTasks.Add(task);
        }
        Task.WaitAll(myTasks.ToArray());//Wait for all tasks to complete.
        for (int i=0; i<messages.Count; i++)
        {
            //Write code to delete the message.
        }
        //Check if the queue is empty or not. If the queue is not empty, then repeat this loop
        //Otherwise simply exit this loop.
        if (Queue.RetrieveApproximateMessageCount() == 0)
        {
            break;
        }
    }
}

お役に立てれば。

于 2012-10-23T15:56:52.473 に答える