BlockingCollectionとプロデューサー/コンシューマーの問題に頭を悩ませようとしています。
私が達成したいのは、次のとおりです。
- オブジェクト(「ジョブ」)のリストをFIFO方式で保持するためのスレッドセーフなキュー。
- これらのジョブの結果のリストをFIFO方式で保持する2番目のスレッドセーフキュー。
言い換えると:
Inbound "Job" Data, can come at any time from multiple threads
==> Thread-Safe FIFO Queue 1 "FQ1"
==> Async Processing of data in FQ1 (and remove item from FQ1)
==> Callback/Results into Thread-Safe FIFO Queue 2 "FQ2"
==> Async Processing of data in FQ2 (and remove item from FQ2)
==> Done
これまでの私の謙虚な試みは次のとおりです。
private BlockingCollection<InboundObject> fq1;
private BlockingCollection<ResultObject> fq2;
(...)
Task.Factory.StartNew(() =>
{
foreach (InboundObject a in fq1.GetConsumingEnumerable())
a.DoWork(result => fq2.Add(result)); //a.DoWork spits out an Action<ResultObject>
}
BlockingCollectionを選択した理由の1つは、負荷を最小限に抑えたいためです。つまり、アイテムが実際にコレクション内にある場合にのみ機能します(待機/スリープは処理しません)。foreachがそのための正しいアプローチであるかどうかはわかりません。
これが正しいかどうか、またはより良いアプローチがあるかどうかを教えてください。ありがとう!
編集 ユニットテストから、タスク内の作業は実際には同期的であることがわかりました。新しいバージョンは次のとおりです。
Task.Factory.StartNew(() =>
{
foreach (InboundObject a in fq1.GetConsumingEnumerable())
Task.Factory.StartNew(async () => { fq2.Add(await a.DoWork()); });
}
入力は大歓迎です!