0

C++ で Intel スレッド ビルディング ブロックを使用した並列マージソートに最適なコードが必要です

4

2 に答える 2

2

まず最初に、私の経験では tbb::parallel_sort() は非常に効率的で、投稿しようとしているコードよりも少し高速です (少なくとも、数千の要素のオーダーの入力については、テスト済み)。

そうは言っても、次のコードはまさにあなたが探しているものだと思います。変数は自明である必要があり、コード内のドキュメントは残りを説明する必要があります-

これは並列化に必要です:

#include<tbb/parallel_invoke.h>

より高速に動作する可能性のある Concurrency::parallel_invoke() を使用する場合は、次の内容を含めます。

#include<ppl.h>

これらの設定をお勧めします -

#define MIN_ELEMENTS_FOR_RECURSION            (50)
#define MIN_ELEMENTS_FOR_PARALLEL_PROCESSING  (100)

以下は、呼び出す主な関数です。パラメーターは、ランダム アクセス クラス (ベクター、デキューなど) の開始と終了の反復子と、比較関数です。

template <typename T_it, typename T_it_dereferenced>
void parallelMergeSort( T_it first, T_it last,  bool (*firstLessThanSecond)(const T_it_dereferenced& a, const T_it_dereferenced& b) )
{
    // create copy of container for extra space
    std::vector<T_it_dereferenced> copy(first, last);

    parallelMergeSortRecursive( first, last, copy.begin(), copy.end(), firstLessThanSecond );
}

これは、各半分をソートするために、parallelMergeSort() から再帰的に呼び出されます -

template <typename T_it, typename T_it_dereferenced>
void parallelMergeSortRecursive( T_it source_first, T_it source_last, T_it copy_first, T_it copy_last,
bool (*firstLessThanSecond)(const T_it_dereferenced& a, const T_it_dereferenced& b), int recursion_depth = 0 )
{
    // divide the array in two, and sort the two halves

    long num_elements = source_last - source_first;

    if ( num_elements > MIN_ELEMENTS_FOR_RECURSION )
    {
        T_it source_middle = source_first + num_elements / 2;
        T_it copy_middle = copy_first + num_elements / 2;

        if ( num_elements > MIN_ELEMENTS_FOR_PARALLEL_PROCESSING )
        {
            // Concurrency::parallel_invoke() may work faster
            tbb::parallel_invoke(
                [=] { parallelMergeSortRecursive( source_first,     source_middle,  copy_first,  copy_middle,   firstLessThanSecond, recursion_depth + 1 ); },
                [=] { parallelMergeSortRecursive( source_middle,    source_last,    copy_middle, copy_last,     firstLessThanSecond, recursion_depth + 1 ); }
            );
        }
        else // sort serially rather than in parallel
        {
            parallelMergeSortRecursive( source_first,   source_middle,  copy_first,  copy_middle,   firstLessThanSecond, recursion_depth + 1 );
            parallelMergeSortRecursive( source_middle,  source_last,    copy_middle, copy_last,     firstLessThanSecond, recursion_depth + 1 );
        }

        // merge the two sorted halves

        // we switch source <--> target with each level of recursion.
        // at even recursion depths (including zero which is the root level) we assume the source is sorted and merge into the target

        if ( recursion_depth % 2 == 0 ) 
        {
            merge( source_first, copy_first, copy_middle, copy_last, firstLessThanSecond );
        }
        else
        {
            merge( copy_first, source_first, source_middle, source_last, firstLessThanSecond );
        }
    }
    else // very few elements remain to be sorted, stop the recursion and sort in place
    {
        if ( recursion_depth % 2 == 0 )
        {
            std::stable_sort(source_first, source_last, firstLessThanSecond);
        }
        else
        {
            std::stable_sort(copy_first, copy_last, firstLessThanSecond);
        }
    }
}

これは、2 つの半分をマージするために再帰関数から呼び出されます -

template <typename T_it, typename T_it_dereferenced>
void merge( T_it target_first, T_it source_first, T_it source_middle, T_it source_last,
bool (*firstLessThanSecond)(const T_it_dereferenced& a, const T_it_dereferenced& b) )
{
    // source is assumed to contain two sorted sequences (from first to middle and from middle to last)

    T_it source_it1 = source_first;
    T_it source_it2 = source_middle;
    T_it target_it = target_first;

    for ( /* intentional */ ; source_it1 < source_middle && source_it2 < source_last ; ++target_it )
    {
        //if ( source_container[i] < source_container[j] )
        if (  firstLessThanSecond(*source_it1, *source_it2)  )
        {
            *target_it = *source_it1;
            ++source_it1;
        }
        else
        {
            *target_it = *source_it2;
            ++source_it2;
        }
    }

    // insert remaining elements in non-completely-traversed-half into original container
    // only one of these two whiles will execute since one of the conditions caused the previous while to stop

    for ( /* intentional */ ; source_it1 < source_middle ; ++target_it )
    {
        *target_it = *source_it1;
        ++source_it1;
    }

    for ( /* intentional */ ; source_it2 < source_last ; ++target_it )
    {
        *target_it = *source_it2;
        ++source_it2;
    }
}
于 2013-06-20T09:43:17.360 に答える
1

TBB には既にソート方法 (並列クイックソート) が含まれていますが、これは実装が非常に貧弱です (実行時間はプロセッサの数に関係なく、少なくとも線形です)。

私の提案は、既存の実装から並列マージソートを移植することです。たとえば、OpenMP を使用する gnu 並列モードの並べ替え (ソース ファイルを含む最近の gcc に含まれています)。すべて#pragma ompをいくつかの tbb 並列コードに置き換えるだけです。

于 2011-03-29T19:33:47.793 に答える