1

以下のコードは、キューが空であっても、最終的に OutOfMemory 例外が発生するまでスレッドを作成し続けます。Parallel.ForEach を通常の foreach に置き換えると、これは起こりません。これが起こる理由を知っている人はいますか?

public delegate void DataChangedDelegate(DataItem obj);

public class Consumer
{
    public DataChangedDelegate OnCustomerChanged;
    public DataChangedDelegate OnOrdersChanged;

    private CancellationTokenSource cts;
    private CancellationToken ct;
    private BlockingCollection<DataItem> queue;

    public Consumer(BlockingCollection<DataItem> queue) {
        this.queue = queue;
        Start();
    }

    private void Start() {
        cts = new CancellationTokenSource();
        ct = cts.Token;
        Task.Factory.StartNew(() => DoWork(), ct);
    }

    private void DoWork() {

        Parallel.ForEach(queue.GetConsumingPartitioner(), item => {
            if (item.DataType == DataTypes.Customer) {
                OnCustomerChanged(item);
            } else if(item.DataType == DataTypes.Order) {
                OnOrdersChanged(item);
            }
        });
    }
}
4

3 に答える 3

5

Parallel.ForEach()主に境界付きコレクションを処理するために作成されたと思います。GetConsumingPartitioner()また、によって返されるコレクションのようなものは期待していませんMoveNext()

問題はParallel.ForEach()、最適な並列度を見つけようとするため、 が実行できる数Taskの s を開始TaskSchedulerすることです。しかし、は、完了するまでに非常に長い時間がかかるTaskScheduler多くTaskの があり、何もしていない (ブロックしている) ため、新しいものを開始し続けます。

を設定するのが最善の解決策だと思いますMaxDegreeOfParallelism

代わりに、TPL Dataflow のActionBlock. この場合の主な違いは、ActionBlock処理するアイテムがない場合にスレッドをブロックしないため、スレッドの数が制限に近づくことはありません。

于 2012-06-22T07:01:23.497 に答える
3

Producer/Consumer パターンは、Producer と Consumer が 1 つずつしかない場合に主に使用されます。

ただし、達成しようとしているもの (複数のコンシューマー) は、ワークリスト パターンにより適切に適合します。次のコードは、ユタ大学で教えられている並列プログラミング クラスの unit2 スライド「2c - 共有メモリ パターン」のスライドから取られたもので、http://ppcp.codeplex.com/からダウンロードできます。

BlockingCollection<Item> workList;
CancellationTokenSource cts;
int itemcount

public void Run()
{
  int num_workers = 4;

  //create worklist, filled with initial work
  worklist = new BlockingCollection<Item>(
    new ConcurrentQueue<Item>(GetInitialWork()));

  cts = new CancellationTokenSource();
  itemcount = worklist.Count();

  for( int i = 0; i < num_workers; i++)
    Task.Factory.StartNew( RunWorker );
}

IEnumberable<Item> GetInitialWork() { ... }

public void RunWorker() {
  try  {
    do {
      Item i = worklist.Take( cts.Token );
      //blocks until item available or cancelled
          Process(i);
      //exit loop if no more items left
    } while (Interlocked.Decrement( ref itemcount) > 0);
  } finally {
      if( ! cts.IsCancellationRequested )
        cts.Cancel();
    }
  }
}

public void AddWork( Item item) {
  Interlocked.Increment( ref itemcount );
  worklist.Add(item);
}

public void Process( Item i ) 
{
  //Do what you want to the work item here.
}

上記のコードでは、ワークリスト アイテムをキューに追加し、任意の数 (この場合は 4 つ) のワーカーを設定して、アイテムをキューから取り出して処理することができます。

.Net 4.0 の並列処理に関するもう 1 つの優れたリソースは、http: //msdn.microsoft.com/en-us/library/ff963553で無料で入手できる書籍「Parallel Programming with Microsoft .Net」です。

于 2012-06-23T01:08:38.263 に答える
1

タスク並列ライブラリの内部では、Parallel.ForとParallel.Foreachは山登りアルゴリズムに従って、操作に使用する並列処理の量を決定します。

多かれ少なかれ、彼らは1つのタスクでボディを実行することから始め、2つに移動し、ブレークポイントに到達してタスクの数を減らす必要があるまで続きます。

これは、迅速に完了するメソッドボディには非常に効果的ですが、ボディの実行に長い時間がかかる場合、並列処理の量を減らす必要があることに気付くまでに長い時間がかかる場合があります。その時点まで、タスクの追加が続行され、コンピューターがクラッシュする可能性があります。

タスク並列ライブラリの開発者の一人による講義で、上記のことを学びました。

MaxDegreeOfParallelismを指定するのが、おそらく最も簡単な方法です。

于 2012-06-22T23:59:56.097 に答える