趣味のプログラムを使用して、ハイ パフォーマンス コンピューティングのテクニックを独学しています。
私の PC には、32 GB のメモリと Microsoft vs2010 C コンパイラの無料バージョンを備えた Intel Ivy Bridge Core i7 3770 プロセッサが搭載されています。
64 ビット プログラムには、4 GB のルックアップ テーブルが 5 つあるため、約 20 GB のメモリが必要です (以下の bytevecM ... bytevecX)。この検索プログラムの内側のループは、別の C ファイルとして記述されています (後でアセンブラー バージョンに置き換えたい場合があるため)。以下に示します。
#define H_PRIME 1000003
int inner(
const char* bytevecM,
const char* bytevecD,
const char* bytevecC,
const char* bytevecL,
const char* bytevecX,
int startval, int endval,
int m2, int d2, int c2, int l2, int x2,
int* ps
)
{
int* psin = ps;
int qqm;
int m3, m4, m5, m6, m7;
int d3, d4, d5, d6, d7;
int c3, c4, c5, c6, c7;
int l3, l4, l5, l6, l7;
int x3, x4, x5, x6, x7;
int q3, q4, q5, q6, q7, q8;
for (q3 = startval; q3 < endval; ++q3)
{
if (q3 == 10 || q3 == 13) continue;
m3 = (m2 ^ q3) * H_PRIME;
d3 = (d2 ^ q3) * H_PRIME;
c3 = (c2 ^ q3) * H_PRIME;
l3 = (l2 ^ q3) * H_PRIME;
x3 = (x2 ^ q3) * H_PRIME;
for (q4 = 1; q4 < 128; ++q4)
{
if (q4 == 10 || q4 == 13) continue;
m4 = (m3 ^ q4) * H_PRIME;
d4 = (d3 ^ q4) * H_PRIME;
c4 = (c3 ^ q4) * H_PRIME;
l4 = (l3 ^ q4) * H_PRIME;
x4 = (x3 ^ q4) * H_PRIME;
for (q5 = 1; q5 < 128; ++q5)
{
if (q5 == 10 || q5 == 13) continue;
m5 = (m4 ^ q5) * H_PRIME;
d5 = (d4 ^ q5) * H_PRIME;
c5 = (c4 ^ q5) * H_PRIME;
l5 = (l4 ^ q5) * H_PRIME;
x5 = (x4 ^ q5) * H_PRIME;
for (q6 = 1; q6 < 128; ++q6)
{
if (q6 == 10 || q6 == 13) continue;
m6 = (m5 ^ q6) * H_PRIME;
d6 = (d5 ^ q6) * H_PRIME;
c6 = (c5 ^ q6) * H_PRIME;
l6 = (l5 ^ q6) * H_PRIME;
x6 = (x5 ^ q6) * H_PRIME;
for (q7 = 1; q7 < 128; ++q7)
{
if (q7 == 10 || q7 == 13) continue;
m7 = (m6 ^ q7) * H_PRIME;
d7 = (d6 ^ q7) * H_PRIME;
c7 = (c6 ^ q7) * H_PRIME;
l7 = (l6 ^ q7) * H_PRIME;
x7 = (x6 ^ q7) * H_PRIME;
for (q8 = 1; q8 < 128; ++q8)
{
if (q8 == 10 || q8 == 13) continue;
qqm = bytevecM[(unsigned int)(m7 ^ q8)];
if (qqm != 0
&& qqm == bytevecD[(unsigned int)(d7 ^ q8)]
&& qqm == bytevecC[(unsigned int)(c7 ^ q8)]
&& qqm == bytevecL[(unsigned int)(l7 ^ q8)]
&& qqm == bytevecX[(unsigned int)(x7 ^ q8)])
{
*ps++ = q3; *ps++ = q4; *ps++ = q5;
*ps++ = q6; *ps++ = q7; *ps++ = q8;
*ps++ = qqm;
}
}
}
}
}
}
}
return (int)(ps - psin);
}
ところで、上記のアルゴリズムは、開始範囲と終了範囲が異なる各スレッドで 1 つのコピーを実行することで、簡単に並列化できることに注意してください。
直感、Intel 組み込み関数、および各変更の個別のベンチマークを使用して、以下に示すように、実行時間を約 5 時間から 3 時間に短縮することができました。
#include <emmintrin.h>
#include <smmintrin.h>
#define H_PRIME 1000003
#define UNROLL(q8) qqm = bytevecM[(unsigned int)(m7 ^ q8)]; \
if (qqm != 0 \
&& qqm == bytevecD[(unsigned int)(s7.m128i_i32[0] ^ q8)] \
&& qqm == bytevecC[(unsigned int)(s7.m128i_i32[1] ^ q8)] \
&& qqm == bytevecL[(unsigned int)(s7.m128i_i32[2] ^ q8)] \
&& qqm == bytevecX[(unsigned int)(s7.m128i_i32[3] ^ q8)]) { \
ps[j++] = _mm_set_epi16(0, qqm, q8, q7, q6, q5, q4, q3); }
int inner(
const char* bytevecM,
const char* bytevecD,
const char* bytevecC,
const char* bytevecL,
const char* bytevecX,
int startval, int endval,
int m2, int d2, int c2, int l2, int x2,
__m128i* ps
)
{
__m128i s2 = _mm_set_epi32(x2, l2, c2, d2);
__m128i hp = _mm_set1_epi32(H_PRIME);
__m128i xt[128];
__m128i s3, s4, s5, s6, s7;
int qqm;
int m3, m4, m5, m6, m7;
int q3, q4, q5, q6, q7;
int j = 0;
int z; for (z = 1; z < 128; ++z) { xt[z] = _mm_set1_epi32(z); }
for (q3 = startval; q3 < endval; ++q3)
{
if (q3 == 10 || q3 == 13) continue;
m3 = (m2 ^ q3) * H_PRIME;
s3 = _mm_mullo_epi32(_mm_xor_si128(s2, xt[q3]), hp);
for (q4 = 1; q4 < 128; ++q4)
{
if (q4 == 10 || q4 == 13) continue;
m4 = (m3 ^ q4) * H_PRIME;
s4 = _mm_mullo_epi32(_mm_xor_si128(s3, xt[q4]), hp);
for (q5 = 1; q5 < 128; ++q5)
{
if (q5 == 10 || q5 == 13) continue;
m5 = (m4 ^ q5) * H_PRIME;
s5 = _mm_mullo_epi32(_mm_xor_si128(s4, xt[q5]), hp);
for (q6 = 1; q6 < 128; ++q6)
{
if (q6 == 10 || q6 == 13) continue;
m6 = (m5 ^ q6) * H_PRIME;
s6 = _mm_mullo_epi32(_mm_xor_si128(s5, xt[q6]), hp);
for (q7 = 1; q7 < 128; ++q7)
{
if (q7 == 10 || q7 == 13) continue;
m7 = (m6 ^ q7) * H_PRIME;
s7 = _mm_mullo_epi32(_mm_xor_si128(s6, xt[q7]), hp);
UNROLL(1)
UNROLL(96)
UNROLL(2)
UNROLL(3)
UNROLL(4)
UNROLL(5)
UNROLL(6)
UNROLL(7)
UNROLL(8)
UNROLL(9)
UNROLL(11)
UNROLL(12)
UNROLL(14)
// ... snipped UNROLL(15) .. UNROLL(125)
UNROLL(126)
UNROLL(127)
}
}
}
}
}
return j;
}
この高速化の大部分は、内側のループを手動で展開したことによるものです。
私は Intel SSE/AVX 命令に非常に慣れていないので、上記の何かを見て顔を引っ張った場合はお知らせください。
インテル VTune は、最大のホット スポットが次の行で発生すると報告しています。
UNROLL(1)
対応するアセンブリ コードで、ホット スポットを以下に示します。
mov eax, ecx 0.917s
mov edx, ecx 0.062s
xor rax, 0x1
movdqa xmmword ptr [rsp+0x20], xmm0
mov ebx, dword ptr [rsp+0x2c] 0.155s
mov r11d, dword ptr [rsp+0x28] 0.949s
movsx ecx, byte ptr [rax+rdi*1] 0.156s
mov r9d, dword ptr [rsp+0x24] 91.132s
mov r8d, dword ptr [rsp+0x20] 0.233s
test ecx, ecx
jz 0x14000225b
---
mov eax, r8d 0.342s
xor rax, 0x1 0.047s
movsx eax, byte ptr [rax+r13*1] 0.124s
cmp ecx, eax 12.631s
jnz 0x14000225b
---
mov eax, r9d
xor rax, 0x1
movsx eax, byte ptr [rax+r12*1]
cmp ecx, eax 0.016s
jnz 0x14000225b
これは、「データの局所性」の問題のように思えます。内側のループを通過するたびに、m7 の値は 4 GB の範囲で激しく予測できないほど変化するため、qqm=bytevecM[m7^1] を検索するときに、最初の UNROLL(1) でキャッシュ ミスが発生する可能性があります。
後続の UNROLL(2)..UNROLL(127) xor m7 と 2..127 があるため、通常、残りの UNROLL でキャッシュ ヒットが発生します。興味深いことに、UNROLL(96) を UNROLL(1) の直後に移動して UNROLL の順序を変更すると、大幅な速度向上が見られました。
メモリからバイトを読み取ると、そのバイトを含む (64 バイト) キャッシュ ラインがいっぱいになることを理解しています。
私はこの分野に非常に慣れていないので、特に大きなテーブル (私の場合は 4 GB テーブル) を扱う場合に、メモリ ルックアップを高速化する方法に関するアドバイスや参考文献を歓迎します。
上記のアルゴリズムでデータの局所性を改善する明確な方法がわかりません。それがどのように達成されるかについての提案も歓迎します。
2013 年 3 月 29 日更新
このノードが作成されてから、以下に示すように、実行時間を 3 時間から 20 分にさらに短縮することができました。
4 GB の bytevec ごとに 4 MB のビットマップを追加すると、約 40 分に短縮され、_mm_prefetch 呼び出しを追加することでさらに半分になりました。
基本的なアルゴリズムは変更されていないことに注意してください。ビットマップを追加することでデータの局所性が改善されました。_mm_prefetch 呼び出しを追加することで、レイテンシが短縮されました。
さらなるパフォーマンス改善のための提案を歓迎します。改善されたプログラムは次のとおりです。
#include <emmintrin.h>
#include <smmintrin.h>
#define H_PRIME 1000003
#define UNROLL(qx) qqm = bytevecM[m7 ^ qx]; \
if (qqm != 0 \
&& qqm == bytevecD[d7 ^ qx]) { \
_mm_prefetch(&bytevecC[c7 ^ qx], _MM_HINT_T0); \
qd[nqd++] = qqm; qd[nqd++] = qx; }
int inner(
const unsigned char* bitvecM,
const unsigned char* bitvecD,
const unsigned char* bitvecC,
const unsigned char* bitvecL,
const unsigned char* bitvecX,
const unsigned char* bitvecV,
const unsigned char* bitvecI,
const unsigned char* bytevecM,
const unsigned char* bytevecD,
const unsigned char* bytevecC,
const unsigned char* bytevecL,
const unsigned char* bytevecX,
int startval, int endval,
int m2, int d2, int c2, int l2, int x2, int v2, int i2,
__m128i* ps
)
{
__declspec(align(16)) __m128i s2 = _mm_set_epi32(i2, v2, x2, l2);
__declspec(align(16)) __m128i hp = _mm_set1_epi32(H_PRIME);
__declspec(align(16)) __m128i xt[128];
__declspec(align(16)) __m128i s3, s4, s5, s6;
int m3, m4, m5, m6;
int d3, d4, d5, d6;
int c3, c4, c5, c6;
unsigned int m7, d7, c7, l7, x7, v7, i7;
int qqm;
int q3, q4, q5, q6, q7, q8;
int iret = 0;
unsigned int qf[128*4];
int nqf;
int qz;
int qd[128*2];
int nqd;
int cnt;
int qh[128*3];
int nqh;
int qi[128*5];
int nqi;
unsigned int m7arr[128];
unsigned int d7arr[128];
const size_t* pbvM = (size_t*)bitvecM;
const size_t* pbvD = (size_t*)bitvecD;
const size_t* pbvC = (size_t*)bitvecC;
const size_t* pbvL = (size_t*)bitvecL;
const size_t* pbvX = (size_t*)bitvecX;
const size_t* pbvV = (size_t*)bitvecV;
const size_t* pbvI = (size_t*)bitvecI;
int z; for (z = 1; z < 128; ++z) { xt[z] = _mm_set1_epi32(z); }
for (q3 = startval; q3 < endval; ++q3)
{
if (q3 == 10 || q3 == 13) continue;
m3 = (m2 ^ q3) * H_PRIME;
d3 = (d2 ^ q3) * H_PRIME;
c3 = (c2 ^ q3) * H_PRIME;
s3 = _mm_mullo_epi32(_mm_xor_si128(s2, xt[q3]), hp);
for (q4 = 1; q4 < 128; ++q4)
{
if (q4 == 10 || q4 == 13) continue;
m4 = (m3 ^ q4) * H_PRIME;
d4 = (d3 ^ q4) * H_PRIME;
c4 = (c3 ^ q4) * H_PRIME;
s4 = _mm_mullo_epi32(_mm_xor_si128(s3, xt[q4]), hp);
for (q5 = 1; q5 < 128; ++q5)
{
if (q5 == 10 || q5 == 13) continue;
m5 = (m4 ^ q5) * H_PRIME;
d5 = (d4 ^ q5) * H_PRIME;
c5 = (c4 ^ q5) * H_PRIME;
s5 = _mm_mullo_epi32(_mm_xor_si128(s4, xt[q5]), hp);
for (q6 = 1; q6 < 128; ++q6)
{
if (q6 == 10 || q6 == 13) continue;
m6 = (m5 ^ q6) * H_PRIME;
d6 = (d5 ^ q6) * H_PRIME;
c6 = (c5 ^ q6) * H_PRIME;
s6 = _mm_mullo_epi32(_mm_xor_si128(s5, xt[q6]), hp);
for (q7 = 1; q7 < 128; ++q7)
{
if (q7 == 10 || q7 == 13) continue;
m7arr[q7] = (unsigned int)( (m6 ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvM[m7arr[q7] >> 13]), _MM_HINT_T0);
d7arr[q7] = (unsigned int)( (d6 ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvD[d7arr[q7] >> 13]), _MM_HINT_T0);
}
nqh = 0;
for (q7 = 1; q7 < 128; ++q7)
{
if (q7 == 10 || q7 == 13) continue;
if ( (pbvM[m7arr[q7] >> 13] & ((size_t)1 << ((m7arr[q7] >> 7) & 63))) == 0 ) continue;
if ( (pbvD[d7arr[q7] >> 13] & ((size_t)1 << ((d7arr[q7] >> 7) & 63))) == 0 ) continue;
c7 = (unsigned int)( (c6 ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvC[c7 >> 13]), _MM_HINT_T0);
l7 = (unsigned int)( (s6.m128i_i32[0] ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvL[l7 >> 13]), _MM_HINT_T0);
qh[nqh++] = q7;
qh[nqh++] = c7;
qh[nqh++] = l7;
}
nqi = 0;
cnt = 0;
while (cnt < nqh)
{
q7 = qh[cnt++];
c7 = qh[cnt++];
l7 = qh[cnt++];
if ( (pbvC[c7 >> 13] & ((size_t)1 << ((c7 >> 7) & 63))) == 0 ) continue;
if ( (pbvL[l7 >> 13] & ((size_t)1 << ((l7 >> 7) & 63))) == 0 ) continue;
x7 = (unsigned int)( (s6.m128i_i32[1] ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvX[x7 >> 13]), _MM_HINT_T0);
v7 = (unsigned int)( (s6.m128i_i32[2] ^ q7) * H_PRIME );
_mm_prefetch((const char*)(&pbvV[v7 >> 13]), _MM_HINT_T0);
qi[nqi++] = q7;
qi[nqi++] = c7;
qi[nqi++] = l7;
qi[nqi++] = x7;
qi[nqi++] = v7;
}
nqf = 0;
cnt = 0;
while (cnt < nqi)
{
q7 = qi[cnt++];
c7 = qi[cnt++];
l7 = qi[cnt++];
x7 = qi[cnt++];
v7 = qi[cnt++];
if ( (pbvX[x7 >> 13] & ((size_t)1 << ((x7 >> 7) & 63))) == 0 ) continue;
if ( (pbvV[v7 >> 13] & ((size_t)1 << ((v7 >> 7) & 63))) == 0 ) continue;
i7 = (unsigned int)( (s6.m128i_i32[3] ^ q7) * H_PRIME );
if ( (pbvI[i7 >> 13] & ((size_t)1 << ((i7 >> 7) & 63))) == 0 ) continue;
_mm_prefetch(&bytevecD[d7arr[q7] & 0xffffff80], _MM_HINT_T0);
_mm_prefetch(&bytevecD[64+(d7arr[q7] & 0xffffff80)], _MM_HINT_T0);
qf[nqf++] = q7;
qf[nqf++] = c7;
qf[nqf++] = l7;
qf[nqf++] = x7;
}
cnt = 0;
while (cnt < nqf)
{
q7 = qf[cnt];
cnt += 4;
_mm_prefetch(&bytevecM[m7arr[q7] & 0xffffff80], _MM_HINT_T0);
_mm_prefetch(&bytevecM[64+(m7arr[q7] & 0xffffff80)], _MM_HINT_T0);
}
qz = 0;
while (qz < nqf)
{
q7 = qf[qz++];
c7 = qf[qz++];
l7 = qf[qz++];
x7 = qf[qz++];
m7 = m7arr[q7];
d7 = d7arr[q7];
nqd = 0;
UNROLL(1)
UNROLL(96)
UNROLL(2)
UNROLL(3)
UNROLL(4)
UNROLL(5)
UNROLL(6)
UNROLL(7)
UNROLL(8)
UNROLL(9)
UNROLL(11)
UNROLL(12)
UNROLL(14)
// ... snipped UNROLL(15) .. UNROLL(125)
UNROLL(126)
UNROLL(127)
if (nqd == 0) continue;
cnt = 0;
while (cnt < nqd)
{
qqm = qd[cnt++];
q8 = qd[cnt++];
if (qqm == bytevecC[c7 ^ q8]
&& qqm == bytevecL[l7 ^ q8]
&& qqm == bytevecX[x7 ^ q8])
{
ps[iret++] = _mm_set_epi16(0, qqm, q8, q7, q6, q5, q4, q3);
}
}
}
}
}
}
}
return iret;
}