いくつかの提案:
- コメントで提案されているように、内部の 128 要素ベクトルを配列に置き換えて、メモリの局所性を向上させます。
- このコードは高度に並列化可能に見えますが、試してみましたか? フィルタリングの組み合わせを使用可能なすべてのコアに分割してから、収集された作業を再調整し、処理をすべてのコアに分割できます。
内側の 128 要素の配列、並列化のための PPL (VS 2012 以降が必要)、およびフィルタリングのための少しの SSE コードを使用するバージョンを実装したところ、かなりのスピードアップが得られました。「さらなる処理」が正確に何を含むかに応じて、物事をわずかに異なる方法で構造化することには利点があるかもしれません (この例では、たとえば、フィルタリング後に作業のバランスを取り直していません)。
更新: Ben Voigt によって提案されたキャッシュ ブロックを実装し、もう少し高速化しました。
#include <vector>
#include <array>
#include <random>
#include <limits>
#include <cstdint>
#include <iostream>
#include <numeric>
#include <chrono>
#include <iterator>
#include <ppl.h>
#include <immintrin.h>
using namespace std;
using namespace concurrency;
namespace {
const int outerVecSize = 20000;
const int innerVecSize = 128;
const int LIMIT = 16000;
auto engine = default_random_engine();
};
typedef vector<uint16_t> InnerVec;
typedef array<uint16_t, innerVecSize> InnerArr;
template <typename Cont> void randomFill(Cont& c) {
// We want approx 0.1% to pass filter, the mean and standard deviation are chosen to get close to that
static auto dist = normal_distribution<>(LIMIT / 4.0, LIMIT / 4.6);
generate(begin(c), end(c), [] {
auto clamp = [](double x, double minimum, double maximum) { return min(max(minimum, x), maximum); };
return static_cast<uint16_t>(clamp(dist(engine), 0.0, numeric_limits<uint16_t>::max()));
});
}
void resizeInner(InnerVec& v) { v.resize(innerVecSize); }
void resizeInner(InnerArr& a) {}
template <typename Inner> Inner generateRandomInner() {
auto inner = Inner();
resizeInner(inner);
randomFill(inner);
return inner;
}
template <typename Inner> vector<Inner> generateRandomInput() {
auto outer = vector<Inner>(outerVecSize);
generate(begin(outer), end(outer), generateRandomInner<Inner>);
return outer;
}
void Report(const chrono::high_resolution_clock::duration elapsed, size_t in1Size, size_t in2Size,
const int passedFilter, const uint32_t specialValue) {
cout << passedFilter << "/" << in1Size* in2Size << " ("
<< 100.0 * (double(passedFilter) / double(in1Size * in2Size)) << "%) passed filter\n";
cout << specialValue << "\n";
cout << "Elapsed time = " << chrono::duration_cast<chrono::milliseconds>(elapsed).count() << "ms" << endl;
}
void TestOriginalVersion() {
cout << __FUNCTION__ << endl;
engine.seed();
const auto v1 = generateRandomInput<InnerVec>();
const auto v2 = generateRandomInput<InnerVec>();
int passedFilter = 0;
uint32_t specialValue = 0;
auto startTime = chrono::high_resolution_clock::now();
for (size_t i1 = 0; i1 < v1.size(); ++i1) { // v1.size() and v2.size() about 20000
for (size_t i2 = 0; i2 < v2.size(); ++i2) {
const vector<uint16_t>& a = v1[i1];
const vector<uint16_t>& b = v2[i2];
bool good = true;
for (std::size_t k = 0; k < 128; ++k) {
if (static_cast<int>(a[k]) + static_cast<int>(b[k])
> LIMIT) { // LIMIT is a const uint16_t: approx 16000
good = false;
break;
}
}
if (!good) continue;
// Further processing involving i1 and i2
++passedFilter;
specialValue += inner_product(begin(a), end(a), begin(b), 0);
}
}
auto endTime = chrono::high_resolution_clock::now();
Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue);
}
bool needsProcessing(const InnerArr& a, const InnerArr& b) {
static_assert(sizeof(a) == sizeof(b) && (sizeof(a) % 16) == 0, "Array size must be multiple of 16 bytes.");
static const __m128i mmLimit = _mm_set1_epi16(LIMIT);
static const __m128i mmLimitPlus1 = _mm_set1_epi16(LIMIT + 1);
static const __m128i mmOnes = _mm_set1_epi16(-1);
auto to_m128i = [](const uint16_t* p) { return reinterpret_cast<const __m128i*>(p); };
return equal(to_m128i(a.data()), to_m128i(a.data() + a.size()), to_m128i(b.data()), [&](const __m128i& a, const __m128i& b) {
// avoid overflow due to signed compare by clamping sum to LIMIT + 1
const __m128i clampSum = _mm_min_epu16(_mm_adds_epu16(a, b), mmLimitPlus1);
return _mm_test_all_zeros(_mm_cmpgt_epi16(clampSum, mmLimit), mmOnes);
});
}
void TestArrayParallelVersion() {
cout << __FUNCTION__ << endl;
engine.seed();
const auto v1 = generateRandomInput<InnerArr>();
const auto v2 = generateRandomInput<InnerArr>();
combinable<int> passedFilterCombinable;
combinable<uint32_t> specialValueCombinable;
auto startTime = chrono::high_resolution_clock::now();
const size_t blockSize = 64;
parallel_for(0u, v1.size(), blockSize, [&](size_t i) {
for (const auto& b : v2) {
const auto blockBegin = begin(v1) + i;
const auto blockEnd = begin(v1) + min(v1.size(), i + blockSize);
for (auto it = blockBegin; it != blockEnd; ++it) {
const InnerArr& a = *it;
if (!needsProcessing(a, b))
continue;
// Further processing involving a and b
++passedFilterCombinable.local();
specialValueCombinable.local() += inner_product(begin(a), end(a), begin(b), 0);
}
}
});
auto passedFilter = passedFilterCombinable.combine(plus<int>());
auto specialValue = specialValueCombinable.combine(plus<uint32_t>());
auto endTime = chrono::high_resolution_clock::now();
Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue);
}
int main() {
TestOriginalVersion();
TestArrayParallelVersion();
}
私の 8 コア システムではかなりの高速化が見られます。結果は、コアの数などによって異なります。
TestOriginalVersion
441579/400000000 (0.110395%) passed filter
2447300015
Elapsed time = 12525ms
TestArrayParallelVersion
441579/400000000 (0.110395%) passed filter
2447300015
Elapsed time = 657ms