データの処理に並列化を利用するアプリケーションがあります。
メインプログラムはC#にあり、データを分析するためのルーチンの1つは外部C++dllにあります。このライブラリはデータをスキャンし、データ内に特定の信号が見つかるたびにコールバックを呼び出します。データを収集、並べ替えてから、HDに保存する必要があります。
これは、コールバックによって呼び出されるメソッドと、データを並べ替えて保存するためのメソッドの最初の簡単な実装です。
// collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// method invoked by the callback
private void Collect(int type, long time)
{
lock(locker) { mySignalList.Add(new MySignal(type, time)); }
}
// store signals to disk
private void Store()
{
// sort the signals
mySignalList.Sort();
// file is a object that manages the writing of data to a FileStream
file.Write(mySignalList.ToArray());
}
データは、サイズ10000 xnの2次元配列(short [] []データ)で構成され、n変数があります。私はこのように並列化を使用します:
Parallel.For(0, 10000, (int i) =>
{
// wrapper for the external c++ dll
ProcessData(data[i]);
}
ここで、10000の配列のそれぞれについて、0から4のコールバックが発生する可能性があると推定します。私はボトルネックに直面しており、CPUリソースが過剰に使用されていないことを考えると、ロック(数千のコールバックと一緒に)が問題であると思います(私は正しいですか、それとも何か他のものがある可能性がありますか?)。ConcurrentBagコレクションを試しましたが、パフォーマンスはさらに悪くなっています(他のユーザーの調査結果と一致しています)。
ロックフリーコードを使用するための可能な解決策は、複数のコレクションを持つことだと思いました。次に、並列プロセスの各スレッドを単一のコレクションで機能させるための戦略が必要になります。コレクションは、たとえばスレッドIDをキーとするディクショナリ内にある可能性がありますが、このための.NET機能はわかりません(並列化を開始する前にディクショナリを初期化するためのスレッドIDを知っている必要があります)。このアイデアは実現可能でしょうか。そうであれば、このための.NETツールは存在しますか?または、プロセスをスピードアップする他のアイデアはありますか?
[編集]ReedCopseyの提案に従い、次のソリューションを使用しました(VS2010のプロファイラーによると、リストのロックと追加の負担がリソースの15%を占める前は、現在は1%にすぎません)。
// master collection where saving found signals
List<MySignal> mySignalList = new List<MySignal>();
// thread-local storage of data (each thread is working on its List<MySignal>)
ThreadLocal<List<MySignal>> threadLocal;
// analyze data
private void AnalizeData()
{
using(threadLocal = new ThreadLocal<List<MySignal>>(() =>
{ return new List<MySignal>(); }))
{
Parallel.For<int>(0, 10000,
() =>
{ return 0;},
(i, loopState, localState) =>
{
// wrapper for the external c++ dll
ProcessData(data[i]);
return 0;
},
(localState) =>
{
lock(this)
{
// add thread-local lists to the master collection
mySignalList.AddRange(local.Value);
local.Value.Clear();
}
});
}
}
// method invoked by the callback
private void Collect(int type, long time)
{
local.Value.Add(new MySignal(type, time));
}