1

Cormen の well-known テキストから並列マージ ソート アルゴリズムを実装しました。pthreads を使用して C で記述し、Win7 x64 で MinGW を使用してコンパイルしました (後で Ubuntu の GCC でテストしても同じ結果が得られました)。並列化における私の最初のアプローチはナイーブでした... 私はすべての再帰レベルで新しいスレッドを生成しました (これは実際に Cormen の疑似コードが意味するものです)。ただし、これには通常、時間がかかりすぎるか、セグメンテーション違反が原因でクラッシュします (システムが処理できるスレッド数には厳しい制限があると思います)。これは、再帰的並列化の初心者によくある間違いのようです。実際、私は同様のディスカッションを見つけました。このサイトで。そのため、代わりにそのスレッドの推奨事項を使用しました。つまり、問題サイズのしきい値を設定し、新しいスレッドを生成する関数にしきい値よりも小さいセット (たとえば 10,000 要素) が与えられた場合、要素を直接操作するのではなく、このような小さなセットの新しいスレッドを作成します。

今、すべてがうまく機能しているように見えました。以下に結果の一部を表にしました。N は問題のサイズ (完全にスクランブルされた整数 [1, 2, 3, ..., N] のセット) であり、しきい値は並列ソートおよび並列マージ関数が新しいスレッドの生成を拒否する値です。最初の表はソート時間をミリ秒で示し、2 番目の表は、それぞれの場合に生成されたソート/マージ ワーカー スレッドの数を示します。一番下のテーブルの N=1E6 行と N=1E7 行を見ると、最大 8000 を超えるマージ ワーカーが許可されるようにしきい値を下げると、セグメンテーション エラーが発生することがわかります。繰り返しますが、これはシステムがスレッドに与える何らかの制限によるものだと思います。それについてもっと知りたいのですが、それは私の主な質問ではありません。

主な問題は、かなり高いしきい値を使用しようとすると、最終行で segfault が発生する理由です。これにより、予想される 15/33 ワーカー スレッドが生成されます (前の行のパターンに従います)。確かに、これは私のシステムが処理するには多すぎるスレッドではありません。完了した 1 つのインスタンス (表の右下のセル) は約 1.2GB の RAM を使用し (私のシステムには 6GB あります)、スレッド化されたバージョンは、各行の右側にスレッドが 0 のインスタンスと比較して、より多くの RAM を使用しているようには見えません。

  • ヒープ制限に達しているとは思いません...大量のRAMが利用可能で、15/33スレッドの生成が許可されていても、最大1GBしかかかりません。
  • また、スタックの問題ではないと思います。私は最小限のスタックを使用するようにプログラムを設計しましたが、各スレッドのフットプリントが問題のサイズ N に関連することはまったくなく、ヒープのみに関連すると思います。私はこれにかなり慣れていません...しかし、gdbでコアダンプスタックバックトレースを行ったところ、スタックの上部から下部までのアドレスは、オーバーフローを除外するのに十分近いようです。
  • pthread_create の戻り値を読み取ろうとしました... Windows では、クラッシュの前に数回 11 の値を取得しました (ただし、11 がいくつかあり、次に 0 がいくつかあったため、クラッシュを引き起こしたようには見えませんでした。エラーがない場合は、別の 11)。そのエラー コードは EAGAIN で、リソースを利用できません... しかし、ここでそれが実際に何を意味するのかはわかりません。さらに、Ubuntuでは、エラーコードはクラッシュするまで毎回0でした。
  • Valgrind を試してみたところ、メモリ リークに関する多くのメッセージが表示されましたが、Valgrind には追加のリソースが必要であることを知っているため、それらが正当なものかどうかはわかりません。

問題のサイズとシステムリソースに関連していることは明らかです...答えを本当に明確にするために、私が見逃している一般的な知識がいくつかあることを願っています。

何か案は?文章が長くてすみません… ここまで読んでくれてありがとう!関連性があると思われる場合は、ソースを投稿できます。

ここに画像の説明を入力

編集:参照用に追加されたソース:

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>

const int               N = 100000000;
const int  SORT_THRESHOLD = 10000000;
const int MERGE_THRESHOLD = 10000000;

int  sort_thread_count = 0;
int merge_thread_count = 0;

typedef struct s_pmergesort_args {
    int *vals_in, p, r, *vals_out, s;
} pmergesort_args;

typedef struct s_pmerge_args {
    int *temp, p1, r1, p2, r2, *vals_out, p3;
} pmerge_args;

void *p_merge_sort(void *v_pmsa);
void *p_merge(void *v_pma);
int binary_search(int val, int *temp, int p, int r);

int main() {
    int *values, i, rand1, rand2, temp, *sorted;
    long long rand1a, rand1b, rand2a, rand2b;
    struct timeval start, end;

    /* allocate values on heap and initialize */
    values = malloc(N * sizeof(int));
    sorted = malloc(N * sizeof(int));
    for (i = 0; i < N; i++) {
        values[i] = i + 1;
        sorted[i] = 0;
    }

    /* scramble
     *  - complicated logic to maximize swapping
     *  - lots of testing (not shown) was done to verify optimal swapping */
    srand(time(NULL));
    for (i = 0; i < N/10; i++) {
        rand1a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand1b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand1 = (int)((rand1a * rand1b + rand()) % N);
        rand2a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand2b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand2 = (int)((rand2a * rand2b + rand()) % N);
        temp = values[rand1];
        values[rand1] = values[rand2];
        values[rand2] = temp;
    }

    /* set up args for p_merge_sort */
    pmergesort_args pmsa;
    pmsa.vals_in = values;
    pmsa.p = 0;
    pmsa.r = N-1;
    pmsa.vals_out = sorted;
    pmsa.s = 0;

    /* sort */
    gettimeofday(&start, NULL);
    p_merge_sort(&pmsa);
    gettimeofday(&end, NULL);

    /* verify sorting */
    for (i = 1; i < N; i++) {
        if (sorted[i] < sorted[i-1]) {
            fprintf(stderr, "Error: array is not sorted.\n");
            exit(0);
        }
    }
    printf("Success: array is sorted.\n");
    printf("Sorting took %dms.\n", (int)(((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec))/1000));

    free(values);
    free(sorted);

    printf("(  sort threads created: %d )\n", sort_thread_count);
    printf("( merge threads created: %d )\n", merge_thread_count);

    return 0;
}

void *p_merge_sort(void *v_pmsa) {
    pmergesort_args pmsa = *((pmergesort_args *) v_pmsa);
    int *vals_in = pmsa.vals_in;
    int p = pmsa.p;
    int r = pmsa.r;
    int *vals_out = pmsa.vals_out;
    int s = pmsa.s;

    int n = r - p + 1;
    pthread_t worker;

    if (n > SORT_THRESHOLD) {
        sort_thread_count++;
    }

    if (n == 1) {
        vals_out[s] = vals_in[p];
    } else {
        int *temp = malloc(n * sizeof(int));
        int q = (p + r) / 2;
        int q_ = q - p + 1;

        pmergesort_args pmsa_l;
        pmsa_l.vals_in = vals_in;
        pmsa_l.p = p;
        pmsa_l.r = q;
        pmsa_l.vals_out = temp;
        pmsa_l.s = 0;

        pmergesort_args pmsa_r;
        pmsa_r.vals_in = vals_in;
        pmsa_r.p = q+1;
        pmsa_r.r = r;
        pmsa_r.vals_out = temp;
        pmsa_r.s = q_;

        if (n > SORT_THRESHOLD) {
            pthread_create(&worker, NULL, p_merge_sort, &pmsa_l);
        } else {
            p_merge_sort(&pmsa_l);
        }
        p_merge_sort(&pmsa_r);

        if (n > SORT_THRESHOLD) {
            pthread_join(worker, NULL);
        }

        pmerge_args pma;
        pma.temp = temp;
        pma.p1 = 0;
        pma.r1 = q_ - 1;
        pma.p2 = q_;
        pma.r2 = n - 1;
        pma.vals_out = vals_out;
        pma.p3 = s;
        p_merge(&pma);
        free(temp);
    }
}

void *p_merge(void *v_pma) {
    pmerge_args pma = *((pmerge_args *) v_pma);
    int *temp = pma.temp;
    int p1 = pma.p1;
    int r1 = pma.r1;
    int p2 = pma.p2;
    int r2 = pma.r2;
    int *vals_out = pma.vals_out;
    int p3 = pma.p3;

    int n1 = r1 - p1 + 1;
    int n2 = r2 - p2 + 1;
    int q1, q2, q3, t;
    pthread_t worker;

    if (n1 < n2) {
        t = p1; p1 = p2; p2 = t;
        t = r1; r1 = r2; r2 = t;
        t = n1; n1 = n2; n2 = t;
    }
    if (n1 > MERGE_THRESHOLD) {
        merge_thread_count++;
    }

    if (n1 == 0) {
        return;
    } else {

        q1 = (p1 + r1) / 2;
        q2 = binary_search(temp[q1], temp, p2, r2);
        q3 = p3 + (q1 - p1) + (q2 - p2);
        vals_out[q3] = temp[q1];

        pmerge_args pma_l;
        pma_l.temp = temp;
        pma_l.p1 = p1;
        pma_l.r1 = q1-1;
        pma_l.p2 = p2;
        pma_l.r2 = q2-1;
        pma_l.vals_out = vals_out;
        pma_l.p3 = p3;

        if (n1 > MERGE_THRESHOLD) {
            pthread_create(&worker, NULL, p_merge, &pma_l);
        } else {        
            p_merge(&pma_l);
        }        

        pmerge_args pma_r;
        pma_r.temp = temp;
        pma_r.p1 = q1+1;
        pma_r.r1 = r1;
        pma_r.p2 = q2;
        pma_r.r2 = r2;
        pma_r.vals_out = vals_out;
        pma_r.p3 = q3+1;

        p_merge(&pma_r);

        if (n1 > MERGE_THRESHOLD) {
            pthread_join(worker, NULL);
        }
    }
}

int binary_search(int val, int *temp, int p, int r) {
    int low = p;
    int mid;
    int high = (p > r+1)? p : r+1;

    while (low < high) {
        mid = (low + high) / 2;
        if (val <= temp[mid]) {
            high = mid;
        } else {
            low = mid + 1;
        }
    }
    return high;
}

EDIT 2:各バージョンで使用される「最大」および「合計」RAMを示す新しい画像を下に追加しました(最大は最大の同時割り当て/使用を意味し、合計はプログラムの存続期間中のすべての割り当て要求の合計を意味します)。これらは、N=1E8 およびしきい値=1E7 の場合、3.2GB の最大使用量を取得する必要があることを示唆しています (システムがサポートできるはずです)。しかし、繰り返しになりますが、これは、実際のシステム リソースではなく、pthread ライブラリの他の制限に関連していると思います。

ここに画像の説明を入力

4

3 に答える 3

3

メモリが不足しているようです。あなたの例では、コードが順次実行される場合、一度に割り当てられる最大のメモリは 1.6GB です。スレッドを使用する場合、3GB 以上を使用しています。malloc/free 関数の周りにいくつかのラッパーを配置し、次の結果を得ました。

Allocation of 12500000 bytes failed with 3074995884 bytes already allocated.

スレッド化するとメモリ使用量が増えることは容易にわかります。その場合、配列全体の左側と右側の両方を同時にソートし、それを行うために 2 つの大きな一時バッファーを割り当てます。順次実行すると、左半分の一時バッファが解放されてから、右半分がソートされます。

使用したラッパーは次のとおりです。

static size_t total_allocated = 0;
static size_t max_allocated = 0;
static pthread_mutex_t total_allocated_mutex;

static void *allocate(int n)
{
  void *result = 0;
  pthread_mutex_lock(&total_allocated_mutex);
  result = malloc(n);
  if (!result) {
    fprintf(stderr,"Allocation of %d bytes failed with %u bytes already allocated\n",n,total_allocated);
  }
  assert(result);
  total_allocated += n;
  if (total_allocated>max_allocated) {
    max_allocated = total_allocated;
  }
  pthread_mutex_unlock(&total_allocated_mutex);
  return result;
}


static void *deallocate(void *p,int n)
{
  pthread_mutex_lock(&total_allocated_mutex);
  total_allocated -= n;
  free(p);
  pthread_mutex_unlock(&total_allocated_mutex);
}
于 2012-07-08T05:07:31.243 に答える
2

I ran it and got:

Program received signal SIGSEGV, Segmentation fault.
[Switching to Thread 7120.0x14dc]
0x004017df in p_merge (v_pma=0x7882c120) at t.c:177
177             vals_out[q3] = temp[q1];
(gdb) p q3
$1 = 58
(gdb) p vals_out
$2 = (int *) 0x0
(gdb) 

This is a NULL pointer dereference. I would put an assertion after you allocate temp to make sure allocation succeeded.

    int *temp = malloc(n * sizeof(int));
    assert(temp);

Analyzing your algorithm a bit, it seems you are pre-allocating the memory you need to do the merge as you recursively go down. You might want to consider altering your algorithm to do the allocation at the time you actually perform the merge.

But, if I recall correctly, merge sort allocates the second array at the very top of the algorithm before any merging occurs, then as the recursive calls unwind, they flip back and forth between the two arrays as the merge runs get longer. This way, there is only one malloc call ever in the whole algorithm. In addition to using less memory, it will perform much better.

My SWAG at modifying your code to use a single allocated temporary array allocated at the top of the algorithm is shown below.

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>

const int               N = 100000000;
const int  SORT_THRESHOLD = 10000000;
const int MERGE_THRESHOLD = 10000000;

int  sort_thread_count = 0;
int merge_thread_count = 0;

typedef struct s_pmergesort_args {
    int *vals_in, p, r, *vals_out, s, *temp;
} pmergesort_args;

typedef struct s_pmerge_args {
    int *temp, p1, r1, p2, r2, *vals_out, p3;
} pmerge_args;

void *p_merge_sort(void *v_pmsa);
void *p_merge(void *v_pma);
int binary_search(int val, int *temp, int p, int r);

int main() {
    int *values, i, rand1, rand2, temp, *sorted, *scratch;
    long long rand1a, rand1b, rand2a, rand2b;
    struct timeval start, end;

    /* allocate values on heap and initialize */
    values = malloc(N * sizeof(int));
    sorted = malloc(N * sizeof(int));
    scratch = malloc(N * sizeof(int));
    for (i = 0; i < N; i++) {
        values[i] = i + 1;
        sorted[i] = 0;
    }

    /* scramble
     *  - complicated logic to maximize swapping
     *  - lots of testing (not shown) was done to verify optimal swapping */
    srand(time(NULL));
    for (i = 0; i < N/10; i++) {
        rand1a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand1b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand1 = (int)((rand1a * rand1b + rand()) % N);
        rand2a = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand2b = (long long)(N*((double)rand()/(1+(double)RAND_MAX)));
        rand2 = (int)((rand2a * rand2b + rand()) % N);
        temp = values[rand1];
        values[rand1] = values[rand2];
        values[rand2] = temp;
    }

    /* set up args for p_merge_sort */
    pmergesort_args pmsa;
    pmsa.vals_in = values;
    pmsa.p = 0;
    pmsa.r = N-1;
    pmsa.vals_out = sorted;
    pmsa.s = 0;
    pmsa.temp = scratch;

    /* sort */
    gettimeofday(&start, NULL);
    p_merge_sort(&pmsa);
    gettimeofday(&end, NULL);

    /* verify sorting */
    for (i = 1; i < N; i++) {
        if (sorted[i] < sorted[i-1]) {
            fprintf(stderr, "Error: array is not sorted.\n");
            exit(0);
        }
    }
    printf("Success: array is sorted.\n");
    printf("Sorting took %dms.\n", (int)(((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec))/1000));

    free(values);
    free(sorted);
    free(scratch);

    printf("(  sort threads created: %d )\n", sort_thread_count);
    printf("( merge threads created: %d )\n", merge_thread_count);

    return 0;
}

void *p_merge_sort(void *v_pmsa) {
    pmergesort_args pmsa = *((pmergesort_args *) v_pmsa);
    int *vals_in = pmsa.vals_in;
    int p = pmsa.p;
    int r = pmsa.r;
    int *vals_out = pmsa.vals_out;
    int s = pmsa.s;
    int *scratch = pmsa.temp;

    int n = r - p + 1;
    pthread_t worker;

    if (n > SORT_THRESHOLD) {
        sort_thread_count++;
    }

    if (n == 1) {
        vals_out[s] = vals_in[p];
    } else {
        int q = (p + r) / 2;
        int q_ = q - p + 1;

        pmergesort_args pmsa_l;
        pmsa_l.vals_in = vals_in;
        pmsa_l.p = p;
        pmsa_l.r = q;
        pmsa_l.vals_out = scratch;
        pmsa_l.s = p;
        pmsa_l.temp = vals_out;

        pmergesort_args pmsa_r;
        pmsa_r.vals_in = vals_in;
        pmsa_r.p = q+1;
        pmsa_r.r = r;
        pmsa_r.vals_out = scratch;
        pmsa_r.s = q+1;
        pmsa_r.temp = vals_out;

        if (n > SORT_THRESHOLD) {
            pthread_create(&worker, NULL, p_merge_sort, &pmsa_l);
        } else {
            p_merge_sort(&pmsa_l);
        }
        p_merge_sort(&pmsa_r);

        if (n > SORT_THRESHOLD) {
            pthread_join(worker, NULL);
        }

        pmerge_args pma;
        pma.temp = scratch + p;
        pma.p1 = 0;
        pma.r1 = q_ - 1;
        pma.p2 = q_;
        pma.r2 = n - 1;
        pma.vals_out = vals_out + p;
        pma.p3 = s - p;
        p_merge(&pma);
    }
}

void *p_merge(void *v_pma) {
    pmerge_args pma = *((pmerge_args *) v_pma);
    int *temp = pma.temp;
    int p1 = pma.p1;
    int r1 = pma.r1;
    int p2 = pma.p2;
    int r2 = pma.r2;
    int *vals_out = pma.vals_out;
    int p3 = pma.p3;

    int n1 = r1 - p1 + 1;
    int n2 = r2 - p2 + 1;
    int q1, q2, q3, t;
    pthread_t worker;

    if (n1 < n2) {
        t = p1; p1 = p2; p2 = t;
        t = r1; r1 = r2; r2 = t;
        t = n1; n1 = n2; n2 = t;
    }
    if (n1 > MERGE_THRESHOLD) {
        merge_thread_count++;
    }

    if (n1 == 0) {
        return;
    } else {

        q1 = (p1 + r1) / 2;
        q2 = binary_search(temp[q1], temp, p2, r2);
        q3 = p3 + (q1 - p1) + (q2 - p2);
        vals_out[q3] = temp[q1];

        pmerge_args pma_l;
        pma_l.temp = temp;
        pma_l.p1 = p1;
        pma_l.r1 = q1-1;
        pma_l.p2 = p2;
        pma_l.r2 = q2-1;
        pma_l.vals_out = vals_out;
        pma_l.p3 = p3;

        if (n1 > MERGE_THRESHOLD) {
            pthread_create(&worker, NULL, p_merge, &pma_l);
        } else {
            p_merge(&pma_l);
        }

        pmerge_args pma_r;
        pma_r.temp = temp;
        pma_r.p1 = q1+1;
        pma_r.r1 = r1;
        pma_r.p2 = q2;
        pma_r.r2 = r2;
        pma_r.vals_out = vals_out;
        pma_r.p3 = q3+1;

        p_merge(&pma_r);

        if (n1 > MERGE_THRESHOLD) {
            pthread_join(worker, NULL);
        }
    }
}

int binary_search(int val, int *temp, int p, int r) {
    int low = p;
    int mid;
    int high = (p > r+1)? p : r+1;

    while (low < high) {
        mid = (low + high) / 2;
        if (val <= temp[mid]) {
            high = mid;
        } else {
            low = mid + 1;
        }
    }
    return high;
}
于 2012-07-08T04:28:11.203 に答える
0

実装を高速化するための並列化はあまり意味がないため、システムに過度のストレスをかけています。並列化にはコストがかかります。システムをそのようなスレッドであふれさせると、システム全体で多くの作業を行う必要があります。スレッドは無料ではありません。

特に、あまりにも多くのスレッドを要求するとプログラムがクラッシュするという「問題」については、これは完全にあなたのせいです: のマニュアルページを読んでくださいpthread_create。その関数は値を返すと述べており、それには理由があります。

スピードアップを得るために (これはあなたが探しているものだと思います)、システムに物理コアを持っている以上のものを得ることは期待できません。場合によっては、コアよりも少し多くのスレッド (たとえば 2 倍) を使用するとよい場合もありますが、すぐに、スレッドが作成するオーバーヘッドが、得られるよりもはるかに多くなります。

次に、mergesort は、通常、比較ではなく RAM へのアクセスによってバインドされるアルゴリズムです。RAM アクセス (mergesort のようにストリーミングを行っている場合でも) は、CPU よりも桁違いに遅くなります。さらに、メモリバスは並列デバイスではありません。メモリアクセスで使用できる唯一の並列処理はキャッシュです (存在する場合)。メモリ フットプリントを 2 倍に膨らませると、すべてのパフォーマンスの向上が台無しになる可能性があります。コードでは、個々のスレッド呼び出しで下にメモリを割り当てることでさらに悪化します。メモリを単独で割り当てるとコストがかかるため、システムはこれらの割り当てを調整する必要があります。

もう一度やり直すには、まず適切なメモリ処理とアクセス パターンを持つ再帰的なマージソート アルゴリズムを記述します。再帰の最上位ノードにいくつかの大きなバッファーのみを割り当て、その一部を再帰呼び出しに渡します。

2 つの並べ替えられたバッファーを 3 番目のバッファーにマージする別のマージ ルーチンを作成します。それをベンチマークし、アルゴリズムが費やすソート項目ごとのマイクロ秒を計算します。それから、CPU の速度を使用して、ソートされたアイテムごとに無駄になるサイクル数を計算します。マージのためにコンパイラが生成するアセンブラを読んで、複雑すぎるように見える場合は、改善方法を見つけてください。

その後、再帰関数に並列処理を追加し始めます。

于 2012-07-08T07:05:15.913 に答える