5

複数のスレッドがアイテムを入れることができるときに、複数のスレッドを使用して同時キュー内のアイテムを処理しようとしている場合、理想的な解決策は、同時データ構造で Reactive Extensions を使用することであることに気付きました。

私の元の質問は次のとおりです。

ConcurrentQueue を使用している間、並列にループしながらデキューしようとしています

したがって、項目が入れられると継続的にデキューされる LINQ (または PLINQ) クエリを持つ方法があるかどうか、私は興味があります。

n 個のプロデューサーがキューにプッシュし、限られた数のスレッドを処理できるようにこれを機能させようとしているので、データベースに過負荷をかけません。

Rx フレームワークを使用できれば、すぐに開始できると思います。100 ミリ秒以内に 100 個のアイテムが配置された場合、PLINQ クエリの一部である 20 個のスレッドがキューを介して処理されます。

私が連携しようとしている 3 つのテクノロジがあります。

  1. Rx フレームワーク (リアクティブ LINQ)
  2. プリング
  3. System.Collections.Concurrent 構造体
4

2 に答える 2

6

Drewは正しいです。ConcurrentQueueは仕事に最適に聞こえますが、実際にはBlockingCollectionが使用する基礎となるデータ構造であると思います。私にも非常に後ろから前に見えます。この本の第7章をチェックしてください* http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie = UTF8 & qid = 1294319704&sr = 8-1 BlockingCollectionの使用方法を説明し、複数のプロデューサーと複数のコンシューマーがそれぞれ「キュー」を削除するようにします。「GetConsumingEnumerable()」メソッドを確認し、場合によってはそのメソッドで.ToObservable()を呼び出すことをお勧めします。

*本の残りの部分はかなり平均的です。

編集:

これが私があなたが望むことをするだろうと思うサンプルプログラムです?

class Program
{
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
        var theQueue = new BlockingCollection<string>();
        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


        LoadQueue(theQueue, "Producer A");
        LoadQueue(theQueue, "Producer B");
        LoadQueue(theQueue, "Producer C");

        _mre.Set();

        Console.WriteLine("Processing now....");

        Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
        Thread.SpinWait(delay);
        Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
        var thread = new Thread(() =>
                                    {
                                        _mre.WaitOne();
                                        for (int i = 0; i < 100; i++)
                                        {
                                            target.Add(string.Format("{0} {1}", prefix, i));
                                        }
                                    });
        thread.Start();
    }
}
于 2011-01-06T13:18:57.720 に答える
3

Rx でこれを達成する最善の方法はわかりませんがBlockingCollection<T>プロデューサーとコンシューマーのパターンを使用することをお勧めします。メイン スレッドはアイテムをコレクションに追加します。コレクションはConcurrentQueue<T>デフォルトで下を使用します。次に、システムにとって意味のあるコレクションからできるだけ多くのアイテムを同時に処理するために使用するTaskものよりも先にスピンアップするセパレートがあります。この場合、デフォルトのパーティショナーは必要以上のオーバーヘッドを作成するため、最も効率的にするために、おそらく ParallelExtensions ライブラリのメソッドの使用も検討する必要があります。詳細については、このブログ投稿を参照してください。Parallel::ForEachBlockingCollection<T>GetConsumingPartitioner

メイン スレッドが終了したら、すべてのコンシューマーがコレクション内のすべての項目の処理を完了するまで待機するために、スピンアップしたおよびでCompleteAddingを呼び出します。BlockingCollection<T>Task::WaitTask

于 2010-12-01T05:26:49.890 に答える