1

BlockingCollectionとプロデューサー/コンシューマーの問題に頭を悩ませようとしています。

私が達成したいのは、次のとおりです。

  • オブジェクト(「ジョブ」)のリストをFIFO方式で保持するためのスレッドセーフなキュー。
  • これらのジョブの結果のリストをFIFO方式で保持する2番目のスレッドセーフキュー。

言い換えると:

Inbound "Job" Data, can come at any time from multiple threads 
   ==> Thread-Safe FIFO Queue 1 "FQ1"
      ==> Async Processing of data in FQ1 (and remove item from FQ1)
         ==> Callback/Results into Thread-Safe FIFO Queue 2 "FQ2"
            ==> Async Processing of data in FQ2 (and remove item from FQ2)
               ==> Done

これまでの私の謙虚な試みは次のとおりです。

private BlockingCollection<InboundObject> fq1;
private BlockingCollection<ResultObject> fq2;

(...)

Task.Factory.StartNew(() =>
{
    foreach (InboundObject a in fq1.GetConsumingEnumerable())
       a.DoWork(result => fq2.Add(result)); //a.DoWork spits out an Action<ResultObject>
}

BlockingCollectionを選択した理由の1つは、負荷を最小限に抑えたいためです。つまり、アイテムが実際にコレクション内にある場合にのみ機能します(待機/スリープは処理しません)。foreachがそのための正しいアプローチであるかどうかはわかりません。

これが正しいかどうか、またはより良いアプローチがあるかどうかを教えてください。ありがとう!

編集 ユニットテストから、タスク内の作業は実際には同期的であることがわかりました。新しいバージョンは次のとおりです。

Task.Factory.StartNew(() =>
{
    foreach (InboundObject a in fq1.GetConsumingEnumerable())
       Task.Factory.StartNew(async () => { fq2.Add(await a.DoWork()); });
}

入力は大歓迎です!

4

1 に答える 1

1

BlockingCollection を選択した理由の 1 つは、負荷を最小限に抑えたいからです。つまり、アイテムが実際にコレクション内にある場合にのみ作業を行います (待機/スリープを処理しません)。foreach がそのための正しいアプローチであるかどうかはわかりません。

これは正しいアプローチであり、foreach新しいアイテムがキューに追加されるかCompleteAddingメソッドが呼び出されるまでブロックされます。BlockingCollection で非同期処理を実現したいというのは正しくありません。BlockingCollection は単純なプロデューサー/コンシューマー キューであり、ジョブとジョブの結果が処理される順序を維持する必要がある場合に使用する必要があります。そのため、同期です。ジョブは、追加された順序で処理されます。

非同期実行だけが必要な場合は、キューは必要ありません。この場合、TPL を使用できます。ジョブごとに新しいタスクを生成するだけで、TPL によって内部的にキューに入れられ、システムが効率的に処理できる限り多くの OS スレッドが使用されます。たとえば、ジョブは独自のタスクを生成できます。これは、はるかに柔軟なアプローチです。

また、プロデューサー/コンシューマー キューを使用して、ジョブのパイプライン実行を整理することもできます。この場合、ジョブをいくつかのステップに分割する必要があります。各ステップは専用スレッドで実行する必要があります。すべてのジョブ ステップ スレッドで、1 つのキューからジョブを読み取り、このジョブを実行してから、次のキューに入れなければなりません。

interface IJob
{
    void Step1();
    void Step2();
    ...
}

var step1 = new BlockingCollection<IJob>();
var step2 = new BlockingCollection<IJob>();
...

Task.Factory.StartNew(() =>
    {
        foreach(var step in step1.GetConsumingEnumerable()) {
            step.Step1();
            step2.Add(step);
        }
    });

Task.Factory.StartNew(() =>
    {
        foreach(var step in step2.GetConsumingEnumerable()) {
            // while performing Step2, another thread can execute Step1
            // of the next job
            step.Step2();
            step3.Add(step);
        }
    });

この場合、ジョブは FIFO 順で並列に実行されます。しかし、パイプライン処理を行う場合は、まず負荷分散について考える必要があります。ステップの 1 つに時間がかかりすぎると、そのキューが大きくなり、ほとんどの場合、他のスレッドがアイドル状態になります。

于 2012-12-17T10:35:57.590 に答える