6

BlockingCollection(ConcurrentBag、50000)があり、コンシューマースレッドのConcurrentDictionaryで処理できるレコード数を最大化するために、プロデューサースレッドに50,000という非常に小さな境界容量を使用しようとしています。プロデューサーはコンシューマーよりもはるかに高速で、それ以外の場合はほとんどのメモリを消費します。

残念ながら、ConcurrentDictionaryのレコードの総数が、テストデータの実行時に50,000の制限付き容量を追加した後、必要な数よりも大幅に少なくなっていることにすぐに気付きました。BlockingCollectionの.addメソッドは、コレクションにaddを実行するスペースができるまで、無期限にブロックする必要があることを読みました。ただし、これは当てはまらないようです。

質問:

  1. BlockingCollectionの容量が解放される前に呼び出されるaddが多すぎると、BlockingCollectionの.addメソッドは最終的にタイムアウトするか、サイレントに失敗しますか?

  2. #1の答えが「はい」の場合、データを失うことなく境界容量を超えた後、何回追加を試みることができますか?

  3. 容量を待機/ブロックしている多くのBlockingCollection.add()メソッドが呼び出され、CompleteAdding()メソッドが呼び出された場合、それらの待機/ブロック追加は待機を続け、最終的に追加しますか、それともサイレントに失敗しますか?

4

1 に答える 1

11

BlockingCollection を ConcurrentDictionary と一緒に使用している場合は、コードのどこかに BlockingCollection.TryAdd(myobject) メソッドが隠されておらず、それを ConcurrentDictionary.TryAdd() メソッドと間違えていないことを確認してください。BlockingCollection.TryAdd(myobject) は、BlockingCollection の境界容量を超えた場合、false を返し、追加要求を破棄して「サイレント フェイル」を生成します。

  1. BlockingCollection の .Add() メソッドは、バウンディング キャパシティを大幅に超えた後に「サイレント フェイル」したり、追加を失ったりするようには見えません。add() メソッドは、容量を超えた BlockingCollection への追加を待機している .add() が多すぎる場合、最終的にプロセスのメモリ不足を引き起こします。(これは、フロー制御の問題の非常に極端なケースでなければなりません)
  2. #1を参照してください。
  3. 私自身のテストでは、MSDN ドキュメントで説明されているように、CompleteAdding() メソッドが呼び出されると、後続のすべての追加が失敗することが示されているようです。

パフォーマンスに関する最後の注意事項

(私の場合はとにかく) BlockingCollection で Bounding Capacity と .Add() を使用すると、同じプロセスで Bounding Capacity と .TryAdd() を使用しない場合に比べて非常に遅いようです。

独自の境界容量戦略を実装することで、はるかに優れたパフォーマンス結果を達成しました。これを行うには多くの方法があります。3 つの選択肢には、Monitor.PulseAll() と一緒に使用される Thread.Sleep()、Thread.Spinwait()、または Monitor.Wait() が含まれます。これらの戦略のいずれかを使用する場合、BlockingCollection.Add() の代わりに BlockingCollection.TryAdd() を使用することもでき、データを失うこともメモリ不足になることもなく、境界容量を持たないこともできます。この方法もパフォーマンスが向上するようです。

Producer スレッドと Consumer スレッドの速度の違いに最適なシナリオに基づいて、3 つの例から選択できます。

Thread.Wait() 例:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the bounded capacity has been exceeded
    //place the thread in wait mode 
    Thread.Sleep(SleepTime);
}

Thread.SpinWait() 例:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the capacity has been exceeded
    //place the thread in wait mode 
    Thread.SpinWait(SpinCount);
}  

Monitor.Wait() の例

この例では、Producer 側と Consumer 側の両方にフックが必要です。

プロデューサーコード

//Check to see BlockingCollection capacity has been exceeded.
if (Tokens.Count > 50000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count > 50000)
        {
            Monitor.Wait(syncLock, 1000);
        }
    }
}

消費者コード

//Check to see BlockingCollection capacity is back a normal range.
if (Tokens.Count <= 40000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count < 40000)
        {
            Monitor.PulseAll(syncLock);
        }
    }
}
于 2012-11-03T19:27:59.423 に答える