Cormen の well-known テキストから並列マージ ソート アルゴリズムを実装しました。pthreads を使用して C で記述し、Win7 x64 で MinGW を使用してコンパイルしました (後で Ubuntu の GCC でテストしても同じ結果が得られました)。並列化における私の最初のアプローチはナイーブでした... 私はすべての再帰レベルで新しいスレッドを生成しました (これは実際に Cormen の疑似コードが意味するものです)。ただし、これには通常、時間がかかりすぎるか、セグメンテーション違反が原因でクラッシュします (システムが処理できるスレッド数には厳しい制限があると思います)。これは、再帰的並列化の初心者によくある間違いのようです。実際、私は同様のディスカッションを見つけました。このサイトで。そのため、代わりにそのスレッドの推奨事項を使用しました。つまり、問題サイズのしきい値を設定し、新しいスレッドを生成する関数にしきい値よりも小さいセット (たとえば 10,000 要素) が与えられた場合、要素を直接操作するのではなく、このような小さなセットの新しいスレッドを作成します。
今、すべてがうまく機能しているように見えました。以下に結果の一部を表にしました。N は問題のサイズ (完全にスクランブルされた整数 [1, 2, 3, ..., N] のセット) であり、しきい値は並列ソートおよび並列マージ関数が新しいスレッドの生成を拒否する値です。最初の表はソート時間をミリ秒で示し、2 番目の表は、それぞれの場合に生成されたソート/マージ ワーカー スレッドの数を示します。一番下のテーブルの N=1E6 行と N=1E7 行を見ると、最大 8000 を超えるマージ ワーカーが許可されるようにしきい値を下げると、セグメンテーション エラーが発生することがわかります。繰り返しますが、これはシステムがスレッドに与える何らかの制限によるものだと思います。それについてもっと知りたいのですが、それは私の主な質問ではありません。
主な問題は、かなり高いしきい値を使用しようとすると、最終行で segfault が発生する理由です。これにより、予想される 15/33 ワーカー スレッドが生成されます (前の行のパターンに従います)。確かに、これは私のシステムが処理するには多すぎるスレッドではありません。完了した 1 つのインスタンス (表の右下のセル) は約 1.2GB の RAM を使用し (私のシステムには 6GB あります)、スレッド化されたバージョンは、各行の右側にスレッドが 0 のインスタンスと比較して、より多くの RAM を使用しているようには見えません。
- ヒープ制限に達しているとは思いません...大量のRAMが利用可能で、15/33スレッドの生成が許可されていても、最大1GBしかかかりません。
- また、スタックの問題ではないと思います。私は最小限のスタックを使用するようにプログラムを設計しましたが、各スレッドのフットプリントが問題のサイズ N に関連することはまったくなく、ヒープのみに関連すると思います。私はこれにかなり慣れていません...しかし、gdbでコアダンプスタックバックトレースを行ったところ、スタックの上部から下部までのアドレスは、オーバーフローを除外するのに十分近いようです。
- pthread_create の戻り値を読み取ろうとしました... Windows では、クラッシュの前に数回 11 の値を取得しました (ただし、11 がいくつかあり、次に 0 がいくつかあったため、クラッシュを引き起こしたようには見えませんでした。エラーがない場合は、別の 11)。そのエラー コードは EAGAIN で、リソースを利用できません... しかし、ここでそれが実際に何を意味するのかはわかりません。さらに、Ubuntuでは、エラーコードはクラッシュするまで毎回0でした。
- Valgrind を試してみたところ、メモリ リークに関する多くのメッセージが表示されましたが、Valgrind には追加のリソースが必要であることを知っているため、それらが正当なものかどうかはわかりません。
問題のサイズとシステムリソースに関連していることは明らかです...答えを本当に明確にするために、私が見逃している一般的な知識がいくつかあることを願っています。
何か案は?文章が長くてすみません… ここまで読んでくれてありがとう!関連性があると思われる場合は、ソースを投稿できます。
編集:参照用に追加されたソース:
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>
const int N = 100000000;
const int SORT_THRESHOLD = 10000000;
const int MERGE_THRESHOLD = 10000000;
int sort_thread_count = 0;
int merge_thread_count = 0;
typedef struct s_pmergesort_args {
int *vals_in, p, r, *vals_out, s;
} pmergesort_args;
typedef struct s_pmerge_args {
int *temp, p1, r1, p2, r2, *vals_out, p3;
} pmerge_args;
void *p_merge_sort(void *v_pmsa);
void *p_merge(void *v_pma);
int binary_search(int val, int *temp, int p, int r);
int main() {
int *values, i, rand1, rand2, temp, *sorted;
long long rand1a, rand1b, rand2a, rand2b;
struct timeval start, end;
/* allocate values on heap and initialize */
values = malloc(N * sizeof(int));
sorted = malloc(N * sizeof(int));
for (i = 0; i < N; i++) {
values[i] = i + 1;
sorted[i] = 0;
}
/* scramble
* - complicated logic to maximize swapping
* - lots of testing (not shown) was done to verify optimal swapping */
srand(time(NULL));
for (i = 0; i < N/10; i++) {
rand1a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
rand1b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
rand1 = (int)((rand1a * rand1b + rand()) % N);
rand2a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
rand2b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
rand2 = (int)((rand2a * rand2b + rand()) % N);
temp = values[rand1];
values[rand1] = values[rand2];
values[rand2] = temp;
}
/* set up args for p_merge_sort */
pmergesort_args pmsa;
pmsa.vals_in = values;
pmsa.p = 0;
pmsa.r = N-1;
pmsa.vals_out = sorted;
pmsa.s = 0;
/* sort */
gettimeofday(&start, NULL);
p_merge_sort(&pmsa);
gettimeofday(&end, NULL);
/* verify sorting */
for (i = 1; i < N; i++) {
if (sorted[i] < sorted[i-1]) {
fprintf(stderr, "Error: array is not sorted.\n");
exit(0);
}
}
printf("Success: array is sorted.\n");
printf("Sorting took %dms.\n", (int)(((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec))/1000));
free(values);
free(sorted);
printf("( sort threads created: %d )\n", sort_thread_count);
printf("( merge threads created: %d )\n", merge_thread_count);
return 0;
}
void *p_merge_sort(void *v_pmsa) {
pmergesort_args pmsa = *((pmergesort_args *) v_pmsa);
int *vals_in = pmsa.vals_in;
int p = pmsa.p;
int r = pmsa.r;
int *vals_out = pmsa.vals_out;
int s = pmsa.s;
int n = r - p + 1;
pthread_t worker;
if (n > SORT_THRESHOLD) {
sort_thread_count++;
}
if (n == 1) {
vals_out[s] = vals_in[p];
} else {
int *temp = malloc(n * sizeof(int));
int q = (p + r) / 2;
int q_ = q - p + 1;
pmergesort_args pmsa_l;
pmsa_l.vals_in = vals_in;
pmsa_l.p = p;
pmsa_l.r = q;
pmsa_l.vals_out = temp;
pmsa_l.s = 0;
pmergesort_args pmsa_r;
pmsa_r.vals_in = vals_in;
pmsa_r.p = q+1;
pmsa_r.r = r;
pmsa_r.vals_out = temp;
pmsa_r.s = q_;
if (n > SORT_THRESHOLD) {
pthread_create(&worker, NULL, p_merge_sort, &pmsa_l);
} else {
p_merge_sort(&pmsa_l);
}
p_merge_sort(&pmsa_r);
if (n > SORT_THRESHOLD) {
pthread_join(worker, NULL);
}
pmerge_args pma;
pma.temp = temp;
pma.p1 = 0;
pma.r1 = q_ - 1;
pma.p2 = q_;
pma.r2 = n - 1;
pma.vals_out = vals_out;
pma.p3 = s;
p_merge(&pma);
free(temp);
}
}
void *p_merge(void *v_pma) {
pmerge_args pma = *((pmerge_args *) v_pma);
int *temp = pma.temp;
int p1 = pma.p1;
int r1 = pma.r1;
int p2 = pma.p2;
int r2 = pma.r2;
int *vals_out = pma.vals_out;
int p3 = pma.p3;
int n1 = r1 - p1 + 1;
int n2 = r2 - p2 + 1;
int q1, q2, q3, t;
pthread_t worker;
if (n1 < n2) {
t = p1; p1 = p2; p2 = t;
t = r1; r1 = r2; r2 = t;
t = n1; n1 = n2; n2 = t;
}
if (n1 > MERGE_THRESHOLD) {
merge_thread_count++;
}
if (n1 == 0) {
return;
} else {
q1 = (p1 + r1) / 2;
q2 = binary_search(temp[q1], temp, p2, r2);
q3 = p3 + (q1 - p1) + (q2 - p2);
vals_out[q3] = temp[q1];
pmerge_args pma_l;
pma_l.temp = temp;
pma_l.p1 = p1;
pma_l.r1 = q1-1;
pma_l.p2 = p2;
pma_l.r2 = q2-1;
pma_l.vals_out = vals_out;
pma_l.p3 = p3;
if (n1 > MERGE_THRESHOLD) {
pthread_create(&worker, NULL, p_merge, &pma_l);
} else {
p_merge(&pma_l);
}
pmerge_args pma_r;
pma_r.temp = temp;
pma_r.p1 = q1+1;
pma_r.r1 = r1;
pma_r.p2 = q2;
pma_r.r2 = r2;
pma_r.vals_out = vals_out;
pma_r.p3 = q3+1;
p_merge(&pma_r);
if (n1 > MERGE_THRESHOLD) {
pthread_join(worker, NULL);
}
}
}
int binary_search(int val, int *temp, int p, int r) {
int low = p;
int mid;
int high = (p > r+1)? p : r+1;
while (low < high) {
mid = (low + high) / 2;
if (val <= temp[mid]) {
high = mid;
} else {
low = mid + 1;
}
}
return high;
}
EDIT 2:各バージョンで使用される「最大」および「合計」RAMを示す新しい画像を下に追加しました(最大は最大の同時割り当て/使用を意味し、合計はプログラムの存続期間中のすべての割り当て要求の合計を意味します)。これらは、N=1E8 およびしきい値=1E7 の場合、3.2GB の最大使用量を取得する必要があることを示唆しています (システムがサポートできるはずです)。しかし、繰り返しになりますが、これは、実際のシステム リソースではなく、pthread ライブラリの他の制限に関連していると思います。