3

次のシナリオがあります。

  1. データベースから 50 個のジョブをブロッキング コレクションに取り込みます。

  2. 各ジョブは長時間実行されます。(潜在的に可能性があります)。だから私はそれらを別のスレッドで実行したい。(私は知っています-それらを Task.WhenAll として実行し、TPL にそれを理解させる方が良いかもしれません-しかし、同時に実行する数を制御したい)

  3. 5つ同時に実行したいとします(構成可能)

  4. 各ジョブに 1 つずつ、合計 5 つのタスク (TPL) を作成し、それらを並行して実行します。

私がやりたいことは、ステップ 4 のジョブの 1 つが完了するとすぐにブロッキング コレクション内の次のジョブを取得し、50 個すべてが完了するまで続行することです。

ジョブが完了すると呼び出される静的なblockingCollectionとTaskCompletionSourceを作成し、コンシューマを再度呼び出してキューから一度に1つのジョブを選択することを考えています。また、各ジョブで async/await を呼び出したいと思いますが、これに加えて、アプローチに影響があるかどうかはわかりません。

これは私がやろうとしていることを達成する正しい方法ですか?

このリンクに似ていますが、キャッチは、最初の N 項目の 1 つが完了したらすぐに次のジョブを処理したいということです。すべての N が完了した後ではありません。

アップデート :

わかりました、誰かが後でそれを使用したい場合は、このコードスニペットがまさに私が望むことをしています。以下に示すように、5 つのスレッドが作成され、現在のジョブが完了すると、各スレッドは次のジョブを開始します。常に 5 つのスレッドのみがアクティブです。これが常にこのように 100% 機能するとは限らず、1 つの CPU/コアで使用するとコンテキスト切り替えのパフォーマンスの問題が発生することを理解しています。

var block = new ActionBlock<Job>(
                job => Handler.HandleJob(job), 
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

              foreach (Job j in GetJobs())
                  block.SendAsync(j);

ジョブ 2 はスレッド :13 で開始されました。待ち時間:3600000ms。時間:2014/08/29 15:14:43

ジョブ 4 はスレッド :14 で開始されました。待ち時間:15000ms。時間:2014/08/29 15:14:43

ジョブ 0 がスレッド :7 で開始されました。待ち時間:600000ms。時間:2014/08/29 15:14:43

ジョブ 1 はスレッド :12 で開始されました。待ち時間:900000ms。時間:2014/08/29 15:14:43

ジョブ 3 はスレッド :11 で開始されました。待ち時間:120000ms。時間:2014/08/29 15:14:43

ジョブ 4 はスレッド :14 で終了しました。2014/8/29 15:14:58

ジョブ 5 はスレッド :14 で開始されました。待ち時間:1800000ms。時間:2014/08/29 15:14:58

ジョブ 3 はスレッド :11 で終了しました。2014/8/29 15:16:43

ジョブ 6 がスレッド :11 で開始されました。待ち時間:1200000ms。時間:2014/08/29 15:16:43

ジョブ 0 はスレッド :7 で終了しました。2014/8/29 15:24:43

ジョブ 7 がスレッド :7 で開始されました。待ち時間:30000ms。時間:2014/08/29 15:24:43

ジョブ 7 はスレッド :7 で終了しました。2014/8/29 15:25:13

ジョブ 8 がスレッド :7 で開始されました。待ち時間:100000ms。時間:2014/08/29 15:25:13

ジョブ 8 はスレッド :7 で終了しました。2014/8/29 15:26:53

ジョブ 9 がスレッド :7 で開始されました。待ち時間:900000ms。時間:2014/08/29 15:26:53

ジョブ 1 はスレッド :12 で終了しました。2014/8/29 15:29:43

ジョブ 10 がスレッド :12 で開始されました。待ち時間:300000ms。時間:2014/08/29 15:29:43

ジョブ 10 はスレッド :12 で終了しました。2014/8/29 15:34:43

ジョブ 11 がスレッド :12 で開始されました。待ち時間:600000ms。時間:2014/08/29 15:34:43

ジョブ 6 はスレッド :11 で終了しました。2014/8/29 15:36:43

ジョブ 12 がスレッド :11 で開始されました。待ち時間:300000ms。時間:2014/08/29 15:36:43

ジョブ 12 はスレッド :11 で終了しました。2014/8/29 15:41:43

ジョブ 13 がスレッド :11 で開始されました。待ち時間:100000ms。時間:2014/08/29 15:41:43

ジョブ 9 はスレッド :7 で終了しました。2014/8/29 15:41:53

ジョブ 14 がスレッド :7 で開始されました。待ち時間:300000ms。時間:2014/08/29 15:41:53

ジョブ 13 はスレッド :11 で終了しました。2014/8/29 15:43:23

ジョブ 11 はスレッド :12 で終了しました。2014/8/29 15:44:43

ジョブ 5 はスレッド :14 で終了しました。2014/8/29 15:44:58

ジョブ 14 はスレッド :7 で終了しました。2014/8/29 15:46:53

ジョブ 2 はスレッド :13 で終了しました。2014/8/29 16:14:43

4

3 に答える 3

5

を使用して、必要なものを簡単に実現できますTPL Dataflow

できることは、データを格納するためのバッファである を使用し、からのリクエストを消費するBufferBlock<T>と一緒にリンクすることです。ActionBlock<T>BufferBlock<T>

ここでの利点は、クラスActionBlock<T>を使用して同時に処理するリクエストの数を指定できることです。ExecutionDataflowBlockOptions

これは単純化されたコンソール バージョンで、着信時に多数の数字を処理し、それらの名前とThread.ManagedThreadID.

private static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();

    var actionBlock =
        new ActionBlock<int>(i => Console.WriteLine("Reading number {0} in thread {1}",
                                  i, Thread.CurrentThread.ManagedThreadId),
                             new ExecutionDataflowBlockOptions 
                                 {MaxDegreeOfParallelism = 5});

    bufferBlock.LinkTo(actionBlock);
    Produce(bufferBlock);

    Console.ReadKey();
}

private static void Produce(BufferBlock<int> bufferBlock)
{
    foreach (var num in Enumerable.Range(0, 500))
    {
        bufferBlock.Post(num);
    }
}

必要に応じて、 awaitable を使用して非同期に投稿することもできますBufferBlock.SendAsync

そうすればTPL、手動で行う必要なく、すべての調整を処理できます。

于 2014-08-29T07:28:49.487 に答える