3

次のコードは、それぞれ複数の 128 要素ベクトルを含む2 つのstd::vectors v1およびで動作します。v2外側のベクトルを通るループ (i1とを使用) には、さらに複雑な処理が実行されるとi2の組み合わせを制限するように設計された内側のループが含まれます。組み合わせの約 99.9% が除外されます。i1i2

残念なことに、フィルタリング ループは私のプログラムの主要なボトルネックif(a[k] + b[k] > LIMIT)です。

const vector<vector<uint16_t>> & v1 = ...
const vector<vector<uint16_t>> & v2 = ...

for(size_t i1 = 0; i1 < v1.size(); ++i1) { //v1.size() and v2.size() about 20000
    for(size_t i2 = 0; i2 < v2.size(); ++i2) {

        const vector<uint16_t> & a = v1[i1];
        const vector<uint16_t> & b = v2[i2];

        bool good = true;
        for(std::size_t k = 0; k < 128; ++k) {
            if(a[k] + b[k] > LIMIT) { //LIMIT is a const uint16_t: approx 16000
                good = false;
                break;
            }
        }
        if(!good) continue;

        // Further processing involving i1 and i2
    }
}

このコードのパフォーマンスは、メモリの局所性を高め、さらにベクトル化することで改善できると思います。これを行う方法、またはその他の改善点について何か提案はありますか?

4

4 に答える 4

3

内側のループに SIMD を適用できます。

    bool good = true;
    for(std::size_t k = 0; k < 128; ++k) {
        if(a[k] + b[k] > LIMIT) { //LIMIT is a const uint16_t: approx 16000
            good = false;
            break;
        }

次のように:

#include <emmintrin.h>  // SSE2 intrinsics
#include <limits.h>     // SHRT_MIN

// ...

    // some useful constants - declare these somewhere before the outermost loop

    const __m128i vLIMIT = _mm_set1_epi16(LIMIT + SHRT_MIN); // signed version of LIMIT
    const __m128i vOFFSET = _mm_set1_epi16(SHRT_MIN);        // offset for uint16_t -> int16_t conversion

// ...

    bool good = true;
    for(std::size_t k = 0; k < 128; k += 8) {
        __m128i v, va, vb;              // iterate through a, b, 8 elements at a time
        int mask;
        va = _mm_loadu_si128(&a[k]);    // get 8 elements from a[k], b[k]
        vb = _mm_loadu_si128(&b[k]);
        v = _mm_add_epi16(va, vb);      // add a and b vectors
        v = _mm_add_epi16(v, vOFFSET);  // subtract 32768 to make signed
        v = _mm_cmpgt_epi16(v, vLIMIT); // compare against LIMIT
        mask = _mm_maskmove_epi8(v);    // get comparison results as 16 bit mask
        if (mask != 0) {                // if any value exceeded limit
            good = false;               // clear good flag and exit loop
            break;
        }

警告: テストされていないコード - デバッグが必要な場合がありますが、一般的なアプローチは適切です。

于 2013-10-08T13:53:29.450 に答える
1

の最も効率的なアクセス パターンを取得しましたが、外側のループの反復ごとにv1すべてを順次スキャンしています。アクセスによって (L2 およびおそらく L3 も) キャッシュ ミスが継続的に発生するv2ため、これは非常に非効率的です。v2

v1より良いアクセス パターンは、ループの入れ子を増やすことです。これにより、外側のループがandを通り抜けv2、内側のループが両方のサブセグメント内の要素を処理し、キャッシュに収まるほど小さくv1なります。v2

基本的に、代わりに

for(size_t i1 = 0; i1 < v1.size(); ++i1) { //v1.size() and v2.size() about 20000
    for(size_t i2 = 0; i2 < v2.size(); ++i2) {

行う

for(size_t i2a = 0; i2a < v2.size(); i2a += 32) {
    for(size_t i1 = 0; i1 < v1.size(); ++i1) {
        for(size_t i2 = i2a; i2 < v2.size() && i2 < i2a + 32; ++i2) {

または

size_t i2a = 0;

// handle complete blocks
for(; i2a < v2.size() - 31; i2a += 32) {
    for(size_t i1 = 0; i1 < v1.size(); ++i1) {
        for(size_t i2 = i2a; i2 < i2a + 32; ++i2) {

        }
    }
}

// handle leftover partial block
for(size_t i1 = 0; i1 < v1.size(); ++i1) {
    for(size_t i2 = i2a; i2 < v2.size(); ++i2) {
    }
}

このようにして、32 * 128 * sizeof (uint16_t)バイトのチャンクまたは 8kB がv2キャッシュからロードされ、20,000 回再利用されます。

この改善は、SIMD (SSE) ベクトル化と直交しています。スレッドベースの並列処理と相互作用しますが、おそらく良い方法です。

于 2013-10-11T16:49:17.560 に答える
0

まず、単純な最適化の 1 つとしてこれが考えられますが、コンパイラはこれを単独で実行できるため、どれだけ改善できるかはわかりません。

for(std::size_t k = 0; k < 128 && good; ++k)
{
    good = a[k] + b[k] <= LIMIT;
}

第 2 に、i1 と i2 に関連する処理が CPU キャッシュを破壊する可能性があるため、良い結果を 2 番目のベクトルに保持する方がよいと思います。

3番目に、これは主要な最適化になる可能性があります。2番目の for ループを次のように書き直すことができると思います: for(size_t i2 = i1; i2 < v2.size(); ++i2) a と b に + 操作を使用しているため交換可能なベクトルなので、i1 と i2 の結果は i2 と i1 と同じになります。このためには、v1 と v2 を同じサイズにする必要があります。サイズが異なる場合は、反復を別の方法で記述する必要があります。

Fort、私が見る限り、あなたは2つの行列を処理しています.ベクトルのベクトルではなく、要素のベクトルを保持する方が良いでしょう.

お役に立てれば。ラズヴァン。

于 2013-10-08T10:03:34.630 に答える
0

いくつかの提案:

  • コメントで提案されているように、内部の 128 要素ベクトルを配列に置き換えて、メモリの局所性を向上させます。
  • このコードは高度に並列化可能に見えますが、試してみましたか? フィルタリングの組み合わせを使用可能なすべてのコアに分割してから、収集された作業を再調整し、処理をすべてのコアに分割できます。

内側の 128 要素の配列、並列化のための PPL (VS 2012 以降が必要)、およびフィルタリングのための少しの SSE コードを使用するバージョンを実装したところ、かなりのスピードアップが得られました。「さらなる処理」が正確に何を含むかに応じて、物事をわずかに異なる方法で構造化することには利点があるかもしれません (この例では、たとえば、フィルタリング後に作業のバランスを取り直していません)。

更新: Ben Voigt によって提案されたキャッシュ ブロックを実装し、もう少し高速化しました。

#include <vector>
#include <array>
#include <random>
#include <limits>
#include <cstdint>
#include <iostream>
#include <numeric>
#include <chrono>
#include <iterator>

#include <ppl.h>

#include <immintrin.h>

using namespace std;
using namespace concurrency;

namespace {
const int outerVecSize = 20000;
const int innerVecSize = 128;
const int LIMIT = 16000;
auto engine = default_random_engine();
};

typedef vector<uint16_t> InnerVec;
typedef array<uint16_t, innerVecSize> InnerArr;

template <typename Cont> void randomFill(Cont& c) {
    // We want approx 0.1% to pass filter, the mean and standard deviation are chosen to get close to that
    static auto dist = normal_distribution<>(LIMIT / 4.0, LIMIT / 4.6);
    generate(begin(c), end(c), [] {
        auto clamp = [](double x, double minimum, double maximum) { return min(max(minimum, x), maximum); };
        return static_cast<uint16_t>(clamp(dist(engine), 0.0, numeric_limits<uint16_t>::max()));
    });
}

void resizeInner(InnerVec& v) { v.resize(innerVecSize); }
void resizeInner(InnerArr& a) {}

template <typename Inner> Inner generateRandomInner() {
    auto inner = Inner();
    resizeInner(inner);
    randomFill(inner);
    return inner;
}

template <typename Inner> vector<Inner> generateRandomInput() {
    auto outer = vector<Inner>(outerVecSize);
    generate(begin(outer), end(outer), generateRandomInner<Inner>);
    return outer;
}

void Report(const chrono::high_resolution_clock::duration elapsed, size_t in1Size, size_t in2Size,
            const int passedFilter, const uint32_t specialValue) {
    cout << passedFilter << "/" << in1Size* in2Size << " ("
         << 100.0 * (double(passedFilter) / double(in1Size * in2Size)) << "%) passed filter\n";
    cout << specialValue << "\n";
    cout << "Elapsed time = " << chrono::duration_cast<chrono::milliseconds>(elapsed).count() << "ms" << endl;
}

void TestOriginalVersion() {
    cout << __FUNCTION__ << endl;

    engine.seed();
    const auto v1 = generateRandomInput<InnerVec>();
    const auto v2 = generateRandomInput<InnerVec>();

    int passedFilter = 0;
    uint32_t specialValue = 0;

    auto startTime = chrono::high_resolution_clock::now();

    for (size_t i1 = 0; i1 < v1.size(); ++i1) { // v1.size() and v2.size() about 20000
        for (size_t i2 = 0; i2 < v2.size(); ++i2) {

            const vector<uint16_t>& a = v1[i1];
            const vector<uint16_t>& b = v2[i2];

            bool good = true;
            for (std::size_t k = 0; k < 128; ++k) {
                if (static_cast<int>(a[k]) + static_cast<int>(b[k])
                    > LIMIT) { // LIMIT is a const uint16_t: approx 16000
                    good = false;
                    break;
                }
            }
            if (!good) continue;

            // Further processing involving i1 and i2
            ++passedFilter;
            specialValue += inner_product(begin(a), end(a), begin(b), 0);
        }
    }

    auto endTime = chrono::high_resolution_clock::now();

    Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue);
}

bool needsProcessing(const InnerArr& a, const InnerArr& b) {
    static_assert(sizeof(a) == sizeof(b) && (sizeof(a) % 16) == 0, "Array size must be multiple of 16 bytes.");
    static const __m128i mmLimit = _mm_set1_epi16(LIMIT);
    static const __m128i mmLimitPlus1 = _mm_set1_epi16(LIMIT + 1);
    static const __m128i mmOnes = _mm_set1_epi16(-1);

    auto to_m128i = [](const uint16_t* p) { return reinterpret_cast<const __m128i*>(p); };
    return equal(to_m128i(a.data()), to_m128i(a.data() + a.size()), to_m128i(b.data()), [&](const __m128i& a, const __m128i& b) {
        // avoid overflow due to signed compare by clamping sum to LIMIT + 1
        const __m128i clampSum = _mm_min_epu16(_mm_adds_epu16(a, b), mmLimitPlus1);
        return _mm_test_all_zeros(_mm_cmpgt_epi16(clampSum, mmLimit), mmOnes);
    });
}

void TestArrayParallelVersion() {
    cout << __FUNCTION__ << endl;

    engine.seed();
    const auto v1 = generateRandomInput<InnerArr>();
    const auto v2 = generateRandomInput<InnerArr>();

    combinable<int> passedFilterCombinable;
    combinable<uint32_t> specialValueCombinable;

    auto startTime = chrono::high_resolution_clock::now();

    const size_t blockSize = 64;

    parallel_for(0u, v1.size(), blockSize, [&](size_t i) {
        for (const auto& b : v2) {
            const auto blockBegin = begin(v1) + i;
            const auto blockEnd = begin(v1) + min(v1.size(), i + blockSize);
            for (auto it = blockBegin; it != blockEnd; ++it) {
                const InnerArr& a = *it;
                if (!needsProcessing(a, b))
                    continue;

                // Further processing involving a and b
                ++passedFilterCombinable.local();
                specialValueCombinable.local() += inner_product(begin(a), end(a), begin(b), 0);
            }
        }
    });

    auto passedFilter = passedFilterCombinable.combine(plus<int>());
    auto specialValue = specialValueCombinable.combine(plus<uint32_t>());

    auto endTime = chrono::high_resolution_clock::now();

    Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue);
}

int main() {
    TestOriginalVersion();
    TestArrayParallelVersion();
}

私の 8 コア システムではかなりの高速化が見られます。結果は、コアの数などによって異なります。

TestOriginalVersion
441579/400000000 (0.110395%) passed filter
2447300015
Elapsed time = 12525ms
TestArrayParallelVersion
441579/400000000 (0.110395%) passed filter
2447300015
Elapsed time = 657ms
于 2013-10-08T16:55:28.810 に答える