8

PLINQでParallelQueryを返す拡張関数を作成する方法を知っている人はいますか?

具体的には、次の問題があります。エンジンが必要なPLINQクエリ内で変換を実行したいのですが、エンジンの作成にはコストがかかり、同時にアクセスすることはできません。

私は次のことができます:

var result = source.AsParallel ().Select ( (i) => { var e = new Engine (); return e.Process(i); } )

ここでは、エンジンはアイテムごとに1回作成されますが、これは高すぎます。

エンジンをスレッドごとに1回作成したい。

Aggregateを使用すると、次のようなもので必要なものに近づくことができます。

// helper class: engine to use plus list of results obtained in thread so far
class EngineAndResults {
   public Engine engine = null;
   public IEnumerable<ResultType> results;
}

var result = source.AsParallel ().Aggregate (

   // done once per block of items (=thread),
   // returning an empty list, but a new engine
   () => new EngineAndList () {
       engine = new Engine (),
       results = Enumerable.Empty<ResultType> ()
   },

   // we process a new item and put it to the thread-local list,
   // preserving the engine for further use
   (engineAndResults, item) => new EngineAndResults () {
       engine = engineAndResults.engine,
       results = Enumerable.Concat (
           engineAndResults.results,
           new ResultType [] { engineAndResults.engine.Process (item) }
       )
   },

   // tell linq how to aggregate across threads
   (engineAndResults1, engineAndResults2) => new EngineAndResults () {
       engine = engineAndResults1.engine,
       results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results)
   },

   // after all aggregations, how do we come to the result?
   engineAndResults => engineAndResults.results
);

ご覧のとおり、スレッドごとにエンジンを運ぶためにアキュムレータを誤用しています。ここでの問題は、最終的にPLINQが結果を単一のIEnumerableに集約し、スレッドが同期されることです。後で別のPLINQ拡張機能を追加したい場合、これはあまり良くありません。

私は次のようなものをいただければ幸いです

   var result = source.AsParallel ()
                  .SelectWithThreadwiseInitWhichIAmLookingFor (
                       () => new Engine (),
                       (engine, item) => engine.Process (item)
              )

誰かがこれを達成する方法を知っていますか?

4

1 に答える 1

5

これを行うために使用できますThreadLocal<T>。何かのようなもの:

var engine = new ThreadLocal<Engine>(() => new Engine());
var result = source.AsParallel()
                   .Select(item => engine.Value.Process(item));
于 2012-06-22T13:33:52.877 に答える