2

複数のプロセッサを使用して、線形演算 (複雑な数学関数をデータセットに適合させる) を並列処理したいと考えています。

マシンに 8 つのコアがあり、1000 個のデータセットに適合させたいとします。私が期待しているのは、1000 個のデータセットをキューとして受け取り、それらを 8 つのコアに送信して処理するシステムです。そのため、1000 個のうち最初の 8 個を FIFO として取得することから始めます。一般に、各データセットのフィッティング時間は他のデータセットとは異なるため、フィッティングされる 8 つのデータセットの一部は他のデータセットよりも長くかかる場合があります。私がシステムに求めているのは、適合したデータ セットの結果を保存し、実行されたスレッドごとに大きなキュー (1000 データセット) から新しいデータセットの取得を再開することです。これは、1000 個のデータセット全体が処理されるまで再開する必要があります。そして、プログラムを進めることができました。

このような制度を何といいますか。C ++でそのためのモデルはありますか?

私は OpenMP と並列化し、テンプレートやポリモーフィズムなどの高度な C++ 手法を使用しています。

何卒よろしくお願いいたします。

4

2 に答える 2

1

動的スケジュールまたは OpenMP タスクで OpenMP 並列を使用できます。どちらも、各反復が完了するまでにかかる時間が異なるケースを並列化するために使用できます。動的にスケジュールされている場合:

#pragma omp parallel
{
   Fitter fitter;
   fitter.init();
   #pragma omp for schedule(dynamic,1)
   for (int i = 0; i < numFits; i++)
      fitter.fit(..., &results[i]);
}

schedule(dynamic,1)各スレッドが一度に 1 つの反復を実行し、処理する反復がなくなるまでスレッドがアイドル状態になることはありません。

タスクの場合:

#pragma omp parallel
{
   Fitter fitter;
   fitter.init();
   #pragma omp single
   for (int i = 0; i < numFits; i++)
   {
      #pragma omp task
      fitter.fit(..., &results[i]);
   }
   #pragma omp taskwait
   // ^^^ only necessary if more code before the end of the parallel region
}

ここで、スレッドの 1 つは、1000 個の OpenMP タスクを生成する for ループを実行します。OMP タスクはキューに保持され、アイドル スレッドによって処理されます。これは動的 for ループといくらか似た働きをしますが、コード構成の自由度が高くなります (たとえば、タスクを使用して再帰アルゴリズムを並列化できます)。コンストラクトは、保留中のtaskwaitすべてのタスクが完了するまで待機します。これは、並列領域の最後で暗示されるため、実際に必要になるのは、並列領域の終了前にさらにコードが続く場合のみです。

どちらの場合も、への各呼び出しfit()は別のスレッドで行われます。パラメータの 1 つのセットのフィッティングが他のセットのフィッティングに影響しないことを確認する必要があります。たとえば、それfit()はスレッドセーフなメソッド/関数です。どちらの場合もfit()、OpenMP コンストラクトのオーバーヘッドよりもはるかに長い実行時間が必要です。

OpenMP タスクには、OpenMP 3.0 準拠のコンパイラが必要です。これにより、MS VC++ のすべてのバージョン (VS2012 のバージョンも含む) が除外されます。

スレッドごとに初期化されるフィッターのインスタンスを 1 つだけにしたい場合は、フィッター オブジェクトをグローバルにするなど、多少異なるアプローチを取る必要がありますthreadprivate

#include <omp.h>

Fitter fitter;
#pragma omp threadprivate(fitter)

...

int main()
{
   // Disable dynamic teams
   omp_set_dynamic(0);

   // Initialise all fitters once per thread
   #pragma omp parallel
   {
      fitter.init();
   }

   ...

   #pragma omp parallel
   {
      #pragma omp for schedule(dynamic,1)
      for (int i = 0; i < numFits; i++)
         fitter.fit(..., &results[i]);
   }

   ...

   return 0;
 }

これは、クラスfitterのグローバル インスタンスです。Fitterこのomp threadprivateディレクティブは、スレッドごとのグローバル変数にするなど、スレッド ローカル ストレージに格納するようにコンパイラに指示します。これらは、異なる並列領域間で保持されます。ローカル変数omp threadprivateにも使用できます。staticこれらも、異なる並列領域間で保持されます (ただし、同じ関数内のみ)。

#include <omp.h>

int main()
{
   // Disable dynamic teams
   omp_set_dynamic(0);

   static Fitter fitter; // must be static
   #pragma omp threadprivate(fitter)

   // Initialise all fitters once per thread
   #pragma omp parallel
   {
      fitter.init();
   }

   ...

   #pragma omp parallel
   {
      #pragma omp for schedule(dynamic,1)
      for (int i = 0; i < numFits; i++)
         fitter.fit(..., &results[i]);
   }

   ...

   return 0;
 }

このomp_set_dynamic(0)呼び出しは動的チームを無効にします。つまり、各並列領域は常に、OMP_NUM_THREADS環境変数で指定された数のスレッドで実行されます。

于 2012-09-27T12:23:40.527 に答える
1

基本的に必要なのは、キューからジョブを取得して処理し、その後別のジョブに進むワーカーのプール (またはスレッド プール) です。OpenMP は、このようなタスクを処理するためのさまざまなアプローチを提供します。たとえば、バリア (すべてのワーカーが特定のポイントまで実行され、特定の要件が満たされた場合にのみ処理が続行される) や、ワーカーがそれぞれの部分を計算した後に値をグローバル変数に蓄積するリダクションなどです。

あなたの質問は非常に幅広いですが、私があなたに与えることができるもう 1 つのヒントは、MapReduce パラダイムを調べることです。このパラダイムでは、関数がデータセットにマップされ、結果が別の関数 (おそらく同じ関数である可能性があります) を使用して削減されるバケットに並べ替えられます。あなたの場合、これは、各プロセッサ/コア/ノードが、割り当てられたデータセットに特定の関数をマップし、結果バケットを別のノードに送信して結合することを意味します。特定の MapReduce フレームワークを使用せずに C++ で MapReduce を使用する場合は、MPI を調べる必要があると思います。1 つのノードでプログラムを実行しているため、OpenMP で同様のことができる可能性があるため、Web で検索すると役立つ場合があります。

TL;DRワーカーのプール(スレッド プール)、バリア、およびMapReduceを検索します。

于 2012-09-27T11:35:51.217 に答える