2

.NET 4アプリケーションで並列データ構造を使用していますが、ConcurrentQueue処理中に追加されるデータ構造があります。

私は次のようなことをしたい:

personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );

データを保存するためにデータベース呼び出しを行うため、同時スレッドの数を制限しています。

しかし、私はがデキューされないことを期待しており、ForAll私はただ行うことを心配しています

ForAll(i => {
    personqueue.personqueue.TryDequeue(...);
    ...
});

私が正しいものを飛び出しているという保証はないので。

では、コレクションとデキューを並行して繰り返すにはどうすればよいでしょうか。

または、PLINQこの処理を並行して実行するために使用する方がよいでしょうか?

4

2 に答える 2

5

ここで何をアーカイブしようとしているのか、100%わかりません。何も残らなくなるまで、すべてのアイテムをデキューしようとしていますか?または、一度にたくさんのアイテムをデキューしますか?

最初のおそらく予期しない動作は、次のステートメントで始まります。

 theQueue.AsParallel()

ConcurrentQueueの場合、「スナップショット」-列挙子を取得します。したがって、並行スタックを反復処理する場合は、スナップショットのみを反復処理し、「ライブ」キューは反復処理しません。

一般に、反復中に変更しているものを反復することはお勧めできません。

したがって、別のソリューションは次のようになります。

        // this way it's more clear, that we only deque for theQueue.Count items
        // However after this, the queue is probably not empty
        // or maybe the queue is also empty earlier   
        Parallel.For(0, theQueue.Count,
                     new ParallelOptions() {MaxDegreeOfParallelism = 20},
                     () => { 
                         theQueue.TryDequeue(); //and stuff
                     });

これにより、反復中に何かを操作する必要がなくなります。ただし、そのステートメントの後でも、forループ中に追加されたデータをキューに含めることができます。

キューを一時的に空にするには、おそらくもう少し作業が必要です。これは本当に醜い解決策です。キューにまだアイテムがある間に、新しいタスクを作成します。各タスクの開始は、可能な限りキューからデキューします。最後に、すべてのタスクが終了するのを待ちます。並列処理を制限するために、20を超えるタスクを作成することはありません。

        // Probably a kitty died because of this ugly code ;)
        // However, this code tries to get the queue empty in a very aggressive way
        Action consumeFromQueue = () =>
                                      {
                                          while (tt.TryDequeue())
                                          {
                                              ; // do your stuff
                                          }
                                      };
        var allRunningTasks = new Task[MaxParallism];
        for(int i=0;i<MaxParallism && tt.Count>0;i++)
        {
            allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
        }
        Task.WaitAll(allRunningTasks);
于 2010-06-10T15:37:54.600 に答える
0

実際のサイト全体で高いレベルを目指していて、すぐにDBを更新する必要がない場合は、追加のレイヤーライブラリよりも、非常に保守的なソリューションを選択する方がはるかに優れています。

固定サイズの配列(推定サイズ-たとえば1000アイテムまたはN秒相当のリクエスト)とインターロックされたインデックスを作成して、リクエストがデータをスロットに入れて返すようにします。1つのブロックがいっぱいになったら(カウントをチェックし続けます)、別のブロックを作成し、非同期デリゲートを生成して、いっぱいになったばかりのブロックを処理してSQLに送信します。デリゲートがすべてのデータをコンマ区切りの配列にパックできるデータの構造によっては、単純なXML(もちろん、その1つのパフォーマンスをテストする必要があります)をSQL sprocに送信して、レコードの処理に最適なものにすることができます。記録によると、大きなロックを保持することはありません。重くなった場合は、ブロックをいくつかの小さなブロックに分割できます。重要なのは、SQLへのリクエストの数を最小限に抑え、常に1度の分離を維持し、

これは、Parallel-sをいじるよりもはるかに高速になります。

于 2010-08-27T13:52:15.070 に答える