6

私は頻繁に次の形式のC++コードを書いています。

while (getline(strm, line)) {
    cout << computationally_intensive_function(line) << endl;
}

このコードを並列化したいと思います。私がこれまでに思いついた最善の解決策は、多数(10000-100000)の行を保持する文字列のベクトルを作成し、このベクトルを次のように並列化することです。

#pragma omp parallel for

次に、ベクトルを空にして、線が残っている間に繰り返します。ただし、この方法では大量のメモリが必要であり、メインプロセスが文字列をバッファリングしている間は他のコアがアイドル状態になります。もっと良い方法はありますか?Pythonmultiprocessing.Pool.mapやHadoopのようなものですか?(ただし、Hadoopはかなり重量があり、コードが実行されるすべての場所にインストールされない可能性があるため、HadoopのC ++ APIの使用は避けたいと思います。)

4

2 に答える 2

5

OpenMP 3.0タスクのよく知られていない機能が存在しますが、このようなケースをカバーするために特別に作成されたため、非常に残念です。コンパイラがその標準バージョンをサポートしている場合は、間違いなくOpenMPタスクを実行する必要があります。stdoutただし、複数のスレッドから(または)に書き込むと、std::cout通常、出力がひどく混ざり合うため、同期する必要があることに注意してください。

#pragma omp parallel
{
    #pragma omp master
    while (getline(strm, line))
    #pragma omp task
    {
        result_type result = computationally_intensive_function(line);
        #pragma omp critical
        {
            cout << result << endl;
            cout.flush();
        }
    }
    #pragma omp taskwait
}

変数を決定するのはあなたに任せていsharedますprivate

于 2012-05-21T05:49:39.743 に答える
1

計算をファイルからの行の読み取りとオーバーラップさせる必要があります。そのための良い方法の1つは、スレッディングビルディングブロックパイプラインアルゴリズムを使用することです。あなたがすることは、3つの(擬似コードの例で示しているものに基づいて)フィルターを指定することです。2つはシリアルで、もう1つはパラレルです。シリアルフィルターは入力フィルターと出力フィルターです。1つ目は、ファイルから1行ずつデータを読み取り、各行を2つ目のフィルターに渡します。このフィルターは並列であり、計算/処理関数をマルチスレッドモードで実行します。最終ステージ/フィルターもシリアルであり、出力を行います。TBBチュートリアルの例をコピーして貼り付けています。これは、達成したいことを正確に実行しているようです。

// Holds a slice of text.
/** Instances *must* be allocated/freed using methods herein, because the
C++ declaration
represents only the header of a much larger object in memory. */
class TextSlice {
    // Pointer to one past last character in sequence
    char* logical_end;
    // Pointer to one past last available byte in sequence.
    char* physical_end;
public:
    // Allocate a TextSlice object that can hold up to max_size characters.
    static TextSlice* allocate( size_t max_size ) {
        // +1 leaves room for a terminating null character.
        TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1 );
        t->logical_end = t->begin();
        t->physical_end = t->begin()+max_size;
        return t;
    }
    // Free this TextSlice object
    void free() {
        tbb::tbb_allocator<char>().deallocate((char*)this,
        sizeof(TextSlice)+(physical_end-begin())+1);
    }
    // Pointer to beginning of sequence
    char* begin() {return (char*)(this+1);}
    // Pointer to one past last character in sequence
    char* end() {return logical_end;}
    // Length of sequence
    size_t size() const {return logical_end-(char*)(this+1);}
    // Maximum number of characters that can be appended to sequence
    size_t avail() const {return physical_end-logical_end;}
    // Append sequence [first,last) to this sequence.
    void append( char* first, char* last ) {
        memcpy( logical_end, first, last-first );
        logical_end += last-first;
    }
    // Set end() to given value.
    void set_end( char* p ) {logical_end=p;}
};

そして、これを実行するための関数は次のとおりです。

void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {
    tbb::parallel_pipeline(
    ntoken,
    tbb::make_filter<void,TextSlice*>(
    tbb::filter::serial_in_order, MyInputFunc(input_file) )
    &
    tbb::make_filter<TextSlice*,TextSlice*>(
    tbb::filter::parallel, MyTransformFunc() )
    &
    tbb::make_filter<TextSlice*,void>(
    tbb::filter::serial_in_order, MyOutputFunc(output_file) ) );
}
于 2012-05-21T10:47:27.243 に答える