37

処理が必要な多くのジョブを保持しているキューについて考えてみます。キューの制限は、一度に1つのジョブしか取得できず、ジョブの数を知る方法がありません。ジョブは完了するのに10秒かかり、Webサービスからの応答を多くの待機が必要になるため、CPUに依存しません。

このようなものを使うと

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}

次に、ジョブを完了するよりもはるかに速くキューから猛烈にポップし、メモリを使い果たして、お尻に落ちます。>。<

Parallel.InvokeまたはParallel.ForEachを使用できないため、ParallelOptions.MaxDegreeOfParallelismを使用できません(私は思いません)

私が見つけた3つの選択肢

  1. Task.Factory.StartNewをに置き換えます

    Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();
    

    これは問題をいくらか解決するようですが、これが何をしているのか、そしてこれが最良の方法であるかどうかは正確にはわかりません。

  2. 同時実行の程度を制限するカスタムタスクスケジューラを作成する

  3. BlockingCollectionのようなものを使用して、開始時にコレクションにジョブを追加し、終了時に削除して、実行できる数を制限します。

#1では、正しい決定が自動的に行われることを信頼する必要があります。#2 /#3自分で実行できるタスクの最大数を計算する必要があります。

私はこれを正しく理解しましたか?どちらがより良い方法ですか、それとも別の方法がありますか?

編集-これは、以下の回答、生産者/消費者パターンから私が思いついたものです。

全体的なスループットの目的は、処理可能な速度よりも速くジョブをデキューすることではなく、複数のスレッドのポーリングキューを持たないことでした(ここには示されていませんが、これは非ブロッキングオペレーションであり、複数の場所から高頻度でポーリングされると、膨大なトランザクションコストが発生します) 。

// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       // May need to wait for running jobs to finish
       break;
    }
}
4

6 に答える 6

22

I just gave an answer which is very applicable to this question.

Basically, the TPL Task class is made to schedule CPU-bound work. It is not made for blocking work.

You are working with a resource that is not CPU: waiting for service replies. This means the TPL will mismange your resource because it assumes CPU boundedness to a certain degree.

Manage the resources yourself: Start a fixed number of threads or LongRunning tasks (which is basically the same). Decide on the number of threads empirically.

You can't put unreliable systems into production. For that reason, I recommend #1 but throttled. Don't create as many threads as there are work items. Create as many threads which are needed to saturate the remote service. Write yourself a helper function which spawns N threads and uses them to process M work items. You get totally predictable and reliable results that way.

于 2012-06-21T13:43:00.947 に答える
12

後でコードまたはサードパーティライブラリで発生する可能性のあるフローの分割と継続はawait、長時間実行されるタスク(またはスレッド)ではうまく機能しないため、長時間実行されるタスクをわざわざ使用しないでください。世界ではasync/await、それらは役に立たない。詳細はこちら

呼び出すことはできますThreadPool.SetMaxThreadsが、この呼び出しを行う前に、スレッドの最小数をThreadPool.SetMinThreads、最大値以下の値を使用して設定していることを確認してください。ちなみに、MSDNのドキュメントは間違っています。少なくとも.NET4.5および4.6では、この手法を使用してメモリが制限された32ビットサービスの処理能力を削減しましたが、これらのメソッド呼び出しを使用して、マシンのコア数を下回ることができます。

ただし、アプリ全体を制限せず、アプリの処理部分だけを制限したい場合は、カスタムタスクスケジューラがその役割を果たします。ずっと前に、MSはを含むいくつかのカスタムタスクスケジューラを備えたサンプルLimitedConcurrencyLevelTaskSchedulerをリリースしました。を使用してメインの処理タスクを手動でTask.Factory.StartNew生成し、カスタムタスクスケジューラを提供します。これによって生成される他のすべてのタスクは、メソッドの早い段階で非同期を実現するために使用されasync/awaitます。Task.Yieldasync

ただし、特定のケースでは、どちらのソリューションも、完了する前にジョブのキューを使い果たすことを止めません。あなたのキューの実装と目的によっては、それは望ましくないかもしれません。それらは、「一連のタスクを実行し、スケジューラーにそれらを実行する時間を見つけさせる」タイプのソリューションのようなものです。したがって、ここでもう少し適切なのは、を介してジョブの実行をより厳密に制御する方法である可能性がありますsemaphores。コードは次のようになります。

semaphore = new SemaphoreSlim(max_concurrent_jobs);

while(...){
 job = Queue.PopJob();
 semaphore.Wait();
 ProcessJobAsync(job);
}

async Task ProcessJobAsync(Job job){
 await Task.Yield();
 ... Process the job here...
 semaphore.Release();
}

猫の皮を剥ぐ方法は複数あります。適切と思われるものを使用してください。

于 2016-03-19T19:28:10.030 に答える
8

Microsoftには、DataFlowと呼ばれる非常に優れたライブラリがあり、これはまさにあなたが望むこと(およびそれ以上)を実行します。詳細はこちら

ActionBlockクラスを使用し、ExecutionDataflowBlockOptionsオブジェクトのMaxDegreeOfParallelismを設定する必要があります。ActionBlockはasync/awaitとうまく連携するため、外部呼び出しが待機している場合でも、新しいジョブの処理は開始されません。

ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions
{
     MaxDegreeOfParallelism = 10
};

this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items),
            actionBlockOptions);
...
this.sendToAzureActionBlock.Post(itemsToProcess)
于 2015-01-23T07:02:35.383 に答える
7

ここでの問題は、実行 Task中のが多すぎるようには見えません。スケジュールされ Taskているが多すぎます。Taskコードは、実行速度に関係なく、できるだけ多くのをスケジュールしようとします。そして、あなたがあまりにも多くの仕事を持っているならば、これはあなたがOOMを得るということを意味します。

このため、提案されたソリューションのいずれも実際には問題を解決しません。単に指定するだけLongRunningで問題が解決するように思われる場合Threadは、新しいものを作成するLongRunningのに時間がかかり、新しいジョブの取得が効果的に抑制されるためである可能性があります。したがって、この解決策は偶然にのみ機能し、後で他の問題を引き起こす可能性があります。

解決策に関しては、私はusrにほぼ同意します。合理的にうまく機能する最も簡単な解決策は、固定数のLongRunningタスクを作成し、呼び出し(そのメソッドがスレッドセーフでない場合は、Queue.PopJob()によって保護されます)とジョブを呼び出す1つのループを持つことです。lockExecute()

更新:もう少し考えた後、私は次の試みがひどく振る舞う可能性が高いことに気づきました。それがあなたのためにうまくいくと本当に確信している場合にのみそれを使用してください。


ただし、TPLは、IOバウンドの場合でも、最適な並列処理の程度を把握しようとしTaskます。だから、あなたはそれをあなたの利益のために使おうとするかもしれません。TaskTPLの観点からは、作業が行われていないように見え、新しいTasksが何度も開始されるため、ここでは長いsは機能しません。代わりにできることはTask、それぞれの終わりに新しいものを開始することですTask。このようにして、TPLは何が起こっているかを認識し、そのアルゴリズムはうまく機能する可能性があります。また、TPLに並列度を決定させるために、Taskその行の最初にあるaの開始時に、 Tasksの別の行を開始します。

このアルゴリズムうまくいくかもしれません。しかし、TPLが並列処理の程度に関して悪い決定を下す可能性もあります。私は実際にこのようなことを試したことがありません。

コードでは、次のようになります。

void ProcessJobs(bool isFirst)
{
    var job = Queue.PopJob(); // assumes PopJob() is thread-safe
    if (job == null)
        return;

    if (isFirst)
        Task.Factory.StartNew(() => ProcessJobs(true));

    job.Execute();

    Task.Factory.StartNew(() => ProcessJob(false));
}

そしてそれを

Task.Factory.StartNew(() => ProcessJobs(true));
于 2012-06-21T13:59:26.887 に答える
1

TaskCreationOptions.LongRunningタスクをブロックするのに役立ち、ここでそれを使用することは合法です。それが行うことは、タスクにスレッドを専用にすることをスケジューラーに提案することです。スケジューラ自体は、過度のコンテキストスイッチングを回避するために、スレッドの数をCPUコアの数と同じレベルに維持しようとします。

これは、Joseph AlbahariによるC#のスレッド化で詳しく説明されています。

于 2012-06-21T15:56:07.043 に答える
1

これを実現するために、メッセージキュー/メールボックスメカニズムを使用しています。それはアクターモデルに似ています。MailBoxを持つクラスがあります。私はこのクラスを「労働者」と呼んでいます。メッセージを受信できます。これらのメッセージはキューに入れられ、基本的に、ワーカーに実行させたいタスクを定義します。ワーカーは、次のメッセージをデキューして次のタスクを開始する前に、タスクを終了するためにTask.Wait()を使用します。

私が持っているワーカーの数を制限することで、実行されている同時スレッド/タスクの数を制限することができます。

これは、分散コンピューティングエンジンに関する私のブログ投稿で、ソースコードとともに概説されています。IActorとWorkerNodeのコードを見ると、それが理にかなっていると思います。

https://long2know.com/2016/08/creating-a-distributed-computing-engine-with-the-actor-model-and-net-core/

于 2016-09-08T20:23:55.277 に答える