Drewは正しいです。ConcurrentQueueは仕事に最適に聞こえますが、実際にはBlockingCollectionが使用する基礎となるデータ構造であると思います。私にも非常に後ろから前に見えます。この本の第7章をチェックしてください*
http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie = UTF8
& qid = 1294319704&sr = 8-1 BlockingCollectionの使用方法を説明し、複数のプロデューサーと複数のコンシューマーがそれぞれ「キュー」を削除するようにします。「GetConsumingEnumerable()」メソッドを確認し、場合によってはそのメソッドで.ToObservable()を呼び出すことをお勧めします。
*本の残りの部分はかなり平均的です。
編集:
これが私があなたが望むことをするだろうと思うサンプルプログラムです?
class Program
{
private static ManualResetEvent _mre = new ManualResetEvent(false);
static void Main(string[] args)
{
var theQueue = new BlockingCollection<string>();
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));
LoadQueue(theQueue, "Producer A");
LoadQueue(theQueue, "Producer B");
LoadQueue(theQueue, "Producer C");
_mre.Set();
Console.WriteLine("Processing now....");
Console.ReadLine();
}
private static void ProcessNewValue(string value, string consumerName, int delay)
{
Thread.SpinWait(delay);
Console.WriteLine("{1} consuming {0}", value, consumerName);
}
private static void LoadQueue(BlockingCollection<string> target, string prefix)
{
var thread = new Thread(() =>
{
_mre.WaitOne();
for (int i = 0; i < 100; i++)
{
target.Add(string.Format("{0} {1}", prefix, i));
}
});
thread.Start();
}
}