正しく指摘されたように、コードには正しい実行を妨げるいくつかの間違いがあるため、まずこれらのエラーを確認することをお勧めします。
とにかく、OpenMP のパフォーマンスがスレッドに応じてどのようにスケーリングするかのみを考慮すると、タスク ディレクティブに基づく実装は、以前の回答ですでに指摘されている制限を克服するため、より適している可能性があります。
セクション ディレクティブには 2 つのセクションしかないため、parallel 句で 2 つより多くのスレッドを生成してもメリットはないと思います
このような実装の痕跡を以下に示します。
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <sys/time.h>
void getTime(double *t) {
struct timeval tv;
gettimeofday(&tv, 0);
*t = tv.tv_sec + (tv.tv_usec * 1e-6);
}
int compare( const void * pa, const void * pb ) {
const int a = *((const int*) pa);
const int b = *((const int*) pb);
return (a-b);
}
void merge(int * array, int * workspace, int low, int mid, int high) {
int i = low;
int j = mid + 1;
int l = low;
while( (l <= mid) && (j <= high) ) {
if( array[l] <= array[j] ) {
workspace[i] = array[l];
l++;
} else {
workspace[i] = array[j];
j++;
}
i++;
}
if (l > mid) {
for(int k=j; k <= high; k++) {
workspace[i]=array[k];
i++;
}
} else {
for(int k=l; k <= mid; k++) {
workspace[i]=array[k];
i++;
}
}
for(int k=low; k <= high; k++) {
array[k] = workspace[k];
}
}
void mergesort_impl(int array[],int workspace[],int low,int high) {
const int threshold = 1000000;
if( high - low > threshold ) {
int mid = (low+high)/2;
/* Recursively sort on halves */
#ifdef _OPENMP
#pragma omp task
#endif
mergesort_impl(array,workspace,low,mid);
#ifdef _OPENMP
#pragma omp task
#endif
mergesort_impl(array,workspace,mid+1,high);
#ifdef _OPENMP
#pragma omp taskwait
#endif
/* Merge the two sorted halves */
#ifdef _OPENMP
#pragma omp task
#endif
merge(array,workspace,low,mid,high);
#ifdef _OPENMP
#pragma omp taskwait
#endif
} else if (high - low > 0) {
/* Coarsen the base case */
qsort(&array[low],high-low+1,sizeof(int),compare);
}
}
void mergesort(int array[],int workspace[],int low,int high) {
#ifdef _OPENMP
#pragma omp parallel
#endif
{
#ifdef _OPENMP
#pragma omp single nowait
#endif
mergesort_impl(array,workspace,low,high);
}
}
const size_t largest = 100000000;
const size_t length = 10000000;
int main(int argc, char *argv[]) {
int * array = NULL;
int * workspace = NULL;
double start,end;
printf("Largest random number generated: %d \n",RAND_MAX);
printf("Largest random number after truncation: %d \n",largest);
printf("Array size: %d \n",length);
/* Allocate and initialize random vector */
array = (int*) malloc(length*sizeof(int));
workspace = (int*) malloc(length*sizeof(int));
for( int ii = 0; ii < length; ii++)
array[ii] = rand()%largest;
/* Sort */
getTime(&start);
mergesort(array,workspace,0,length-1);
getTime(&end);
printf("Elapsed time sorting: %g sec.\n", end-start);
/* Check result */
for( int ii = 1; ii < length; ii++) {
if( array[ii] < array[ii-1] ) printf("Error:\n%d %d\n%d %d\n",ii-1,array[ii-1],ii,array[ii]);
}
free(array);
free(workspace);
return 0;
}
パフォーマンスを追求する場合は、再帰関数呼び出しによる実質的なオーバーヘッドを回避するために、再帰の基本ケースが十分に粗いことも保証する必要があることに注意してください。それ以外に、どの部分を最適化する価値があるかについて良いヒントを得ることができるように、コードをプロファイリングすることをお勧めします。