7

私は遊んでいてstd::thread、奇妙なものがポップアップしました:

#include <thread>

int k = 0;

int main() {
    std::thread t1([]() { while (k < 1000000000) { k = k + 1; }});
    std::thread t2([]() { while (k < 1000000000) { k = k + 1; }});

    t1.join();
    t2.join();

    return 0;
}

clang++ を使用して最適化なしで上記のコードをコンパイルすると、次のベンチマークが得られました。

real 0m2.377s  
user 0m4.688s  
sys  0m0.005s

次に、コードを次のように変更しました:(現在は1つのスレッドのみを使用しています)

#include <thread>

int k = 0;

int main() {
    std::thread t1([]() { while (k < 1000000000) { k = k + 1; }});

    t1.join();

    return 0;
}

そして、これらは新しいベンチマークでした:

real 0m2.304s
user 0m2.298s
sys  0m0.003s

2 つのスレッドを利用するコードは、1 つを利用するコードよりも遅いのはなぜですか?

4

4 に答える 4

4

これは実際には Mats Petersson の回答に対するコメントである必要がありますが、コード例を提供したかったのです。

問題は、特定のリソースとキャッシュラインの競合です。

代替案 1:

#include <cstdint>
#include <thread>
#include <vector>
#include <stdlib.h>

static const uint64_t ITERATIONS = 10000000000ULL;

int main(int argc, const char** argv)
{
    size_t numThreads = 1;
    if (argc > 1) {
        numThreads = strtoul(argv[1], NULL, 10);
        if (numThreads == 0)
            return -1;
    }

    std::vector<std::thread> threads;

    uint64_t k = 0;
    for (size_t t = 0; t < numThreads; ++t) {
       threads.emplace_back([&k]() { // capture k by reference so we all use the same k.
           while (k < ITERATIONS) {
               k++;
           }
       });
    }

    for (size_t t = 0; t < numThreads; ++t) {
        threads[t].join();
    }
    return 0;
}

ここでは、スレッドは単一の変数を求めて競合し、読み取りと書き込みの両方を実行するため、競合が発生し、シングルスレッドのケースが最も効率的になります。

#include <cstdint>
#include <thread>
#include <vector>
#include <stdlib.h>
#include <atomic>

static const uint64_t ITERATIONS = 10000000000ULL;

int main(int argc, const char** argv)
{
    size_t numThreads = 1;
    if (argc > 1) {
        numThreads = strtoul(argv[1], NULL, 10);
        if (numThreads == 0)
            return -1;
    }

    std::vector<std::thread> threads;

    std::atomic<uint64_t> k = 0;
    for (size_t t = 0; t < numThreads; ++t) {
       threads.emplace_back([&]() {
           // Imperfect division of labor, we'll fall short in some cases.
           for (size_t i = 0; i < ITERATIONS / numThreads; ++i) {
               k++;
           }
       });
    }

    for (size_t t = 0; t < numThreads; ++t) {
        threads[t].join();
    }
    return 0;
}

ここでは、作業を決定論的に分割します (numThreads が ITERATIONS の約数ではないが、このデモンストレーションには十分に近い場合に該当します)。残念ながら、メモリ内の共有要素へのアクセスに関して、まだ競合が発生しています。

#include <cstdint>
#include <thread>
#include <vector>
#include <stdlib.h>
#include <atomic>

static const uint64_t ITERATIONS = 10000000000ULL;

int main(int argc, const char** argv)
{
    size_t numThreads = 1;
    if (argc > 1) {
        numThreads = strtoul(argv[1], NULL, 10);
        if (numThreads == 0)
            return -1;
    }

    std::vector<std::thread> threads;
    std::vector<uint64_t> ks;

    for (size_t t = 0; t < numThreads; ++t) {
       threads.emplace_back([=, &ks]() {
           auto& k = ks[t];
           // Imperfect division of labor, we'll fall short in some cases.
           for (size_t i = 0; i < ITERATIONS / numThreads; ++i) {
               k++;
           }
       });
    }

    uint64_t k = 0;
    for (size_t t = 0; t < numThreads; ++t) {
        threads[t].join();
        k += ks[t];
    }
    return 0;
}

繰り返しますが、これはワークロードの分散に関して決定論的であり、結果を照合するために最後に少し労力を費やします。ただし、カウンターの配布が健全な CPU 配布を優先するようにすることは何もしませんでした。そのために:

#include <cstdint>
#include <thread>
#include <vector>
#include <stdlib.h>

static const uint64_t ITERATIONS = 10000000000ULL;
#define CACHE_LINE_SIZE 128

int main(int argc, const char** argv)
{
    size_t numThreads = 1;
    if (argc > 1) {
        numThreads = strtoul(argv[1], NULL, 10);
        if (numThreads == 0)
            return -1;
    }

    std::vector<std::thread> threads;
    std::mutex kMutex;
    uint64_t k = 0;

    for (size_t t = 0; t < numThreads; ++t) {
       threads.emplace_back([=, &k]() {
           alignas(CACHE_LINE_SIZE) uint64_t myK = 0;
           // Imperfect division of labor, we'll fall short in some cases.
           for (uint64_t i = 0; i < ITERATIONS / numThreads; ++i) {
               myK++;
           }
           kMutex.lock();
           k += myK;
           kMutex.unlock();
       });
    }

    for (size_t t = 0; t < numThreads; ++t) {
        threads[t].join();
    }
    return 0;
}

ここでは、ミューテックスを使用して同期を制御する最後の 1 つのケースを除いて、スレッド間の競合をキャッシュ ライン レベルまで回避します。この些細なワークロードの場合、ミューテックスは相対コストが非常に高くなります。または、alignas を使用して、各スレッドに外部スコープで独自のストレージを提供し、結合後に結果を要約して、ミューテックスの必要性をなくすこともできます。それは読者の練習問題として残しておきます。

于 2013-06-16T22:09:44.093 に答える
2

「なぜこれが機能しなかったのか」よりも重要な質問のように思えます。「これを機能させるにはどうすればよいですか?」当面のタスクについては、 (重大な欠点std::asyncにもかかわらず)直接使用するよりも本当に優れたツールだと思います。std::thread

#include <future>
#include <iostream>

int k = 0;
unsigned tasks = std::thread::hardware_concurrency();
unsigned reps = 1000000000 / tasks;

int main() {
    std::vector<std::future<int>> f;

    for (int i=0; i<tasks; i++)
        f.emplace_back(std::async(std::launch::async, 
                                  [](){int j; for (j=0; j<reps; j++); return j;})
                      );

    for (int i=0; i<tasks; i++) {
        f[i].wait();
        k += f[i].get();
    }

    std::cout << k << "\n";
    return 0;
}
于 2013-06-16T22:56:02.003 に答える