大量のデータ セットを反復処理し、処理された結果をシリアル化のためにコンシューマー スレッドに返す必要があるメソッドがあります。ストリーミング PLinq は、パフォーマンス面で最適です。
これらの操作は頻繁に行われるため、オブジェクトの作成を最小限に抑えるために、オブジェクトプールを使用して処理用のコンテナーをキャッシュしています。私は、concurrentstack を使用して objectpool を実装しようとしました (concurrentbag と concurrentqueue は同じ問題を示します)。まれに、同じアイテム (ハッシュコードを参照) が同じスレッドによってプールから取得されますが、コンシューマー スレッドによって解放されたわけではありません。プールの取得メソッドと解放メソッドにトレースを追加しました。これが出力です。
5:11:32.250 PM スレッド 31 のアイテム 16071020 を取得
5:11:32.254 PM スレッド 31 のアイテム 16071020 を取得
5:11:32.260 PM スレッド 27
のアイテム 16071020 を配置
ここに私が使用しているコードがあります:
var itemsToProcess = data.AsParallel()
.Where(x => Filter(x))
.Select(row => Process(row));
Process メソッドでは、プールからオブジェクトを取得します。
result = ObjectPool.Instance.GetObject();
Pool クラスの実装:
public class ObjectPool
{
private ConcurrentStack<object[]> _objects;
private int size;
private const int maxSize = 20000;
private static ObjectPool instance = new ObjectPool(500);
public static ObjectPool Instance
{
get { return instance; }
}
private ObjectPool(int size)
{
this.size = size;
_objects = new ConcurrentStack<object[]>();
}
public object[] GetObject()
{
object[] item;
if (_objects.TryPop(out item))
{
Trace.WriteLine(string.Format("Get item {0} for Thread {1}", item.GetHashCode(), Thread.CurrentThread.ManagedThreadId));
return item;
}
return new object[size];
}
public void Clear()
{
_objects.Clear();
}
public void PutObject(object[] item)
{
Trace.WriteLine(string.Format("Put item {0} for Thread {1}", item.GetHashCode(), Thread.CurrentThread.ManagedThreadId));
if (_objects.Count < maxSize)
{
_objects.Push(item);
}
}
}
このような状況が発生するのを防ぐ方法について、私は途方に暮れています。なぜこれが起こるのか、そしてそれを防ぐ方法についてのアイデアはありますか?