4

私のアプリケーションでは、3つのクラス、、、がExtractorありTransformerLoaderこれらは4番目のクラスで調整されていCoordinatorます。ExtractorTransformerおよびLoaderは非常に単純で、次のことを行います。

Extractor

たとえば、テキストファイルから読み取ることにより、Resultsタイプと呼ばれるメンバーを公開します。IEnumerable<string>抽出は同期する必要があります。

Transformer

Transform文字列を受け入れ、時間がかかると予想されるプロセスを介して別の文字列に変換するというメンバーを公開します(ここでは並列処理を使用します)。

Loader

Load文字列を受け入れ、それを最終的な形式(別のテキストファイルなど)にロードするというメンバーを公開します。読み込みは同期する必要があります。

Coordinatorクラスは3つの操作を調整します。変換プロセスは並行して実行する必要があり、次に結果をキューにプッシュして、ローダーによって読み取られます。CoordinatorRun()メソッドは次のようになります。

Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();

ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();

Parallel.ForEach(extractor.Results, x => outputs.Enqueue(transformer.Transform(x)));

foreach(string output in outputs)
{
  loader.Load(output);
}

これはうまく機能していますが、ロードを実行する前にすべての変換を終了する必要があります。つまりParallel.ForEach()、次の開始前に完了しforeachます。準備ができ次第、各出力をローダーに渡すことをお勧めします。

私もこれを試しました:

Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();

ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();

foreach (string input in extractor.Results)
{
  string input1 = input;
  Task task = Task.Factory.StartNew(
                    () => outputs.Enqueue(transformer.Transform(input1)));
}

foreach(string output in outputs)
{
  loader.Load(output);
}

ただし、foreach出力がキューに追加される前に下部のループがヒットするため、単に終了します。

の呼び出しから結果が利用可能になり次第、読み込みを実行するにはどうすればよいtransformer.Transform()ですか?

4

1 に答える 1

7

BlockingCollection代わりに。を試してくださいParallel.Invoke。以下の例では、GetConsumingEnumerable(プロデューサー-コンシューマーパターンのコンシューマーCompleteAdding部分)は呼び出されるまで終了しないため、完了loadするまで実行されfillます。

var outputs = new BlockingCollection<string>();

// aka Producer
Action fill = () => {
    Parallel.ForEach(extractor.Results, x => outputs.Add(transformer.Transform(x)));        
    outputs.CompleteAdding();
};

// aka Consumer
Action load = () => {
   foreach(var o in outputs.GetConsumingEnumerable()) 
       loader.Load(o);
}

Parallel.Invoke(fill, load);
于 2012-10-03T15:56:44.860 に答える