6

I started a similar question on another thread, but then I was focusing on how to use OpenCV. Having failed to achieve what I originally wanted, I will ask here exactly what I want.

I have two matrices. Matrix a is 2782x128 and Matrix b is 4000x128, both unsigned char values. The values are stored in a single array. For each vector in a, I need the index of the vector in b with the closest euclidean distance.

Ok, now my code to achieve this:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"

using namespace std;

void main(int argc, char* argv[])
{
    int a_size;
    unsigned char* a = NULL;
    read_matrix(&a, a_size,"matrixa");
    int b_size;
    unsigned char* b = NULL;
    read_matrix(&b, b_size,"matrixb");

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    int* indexes = NULL;
    min_distance_loop(&indexes, b, b_size, a, a_size);

    QueryPerformanceCounter( &liEnd );

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    if (a)
    delete[]a;
if (b)
    delete[]b;
if (indexes)
    delete[]indexes;
    return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
    ofstream myfile;
    float f;
    FILE * pFile;
    pFile = fopen (matrixPath,"r");
    fscanf (pFile, "%d", &matrix_size);
    *matrix = new unsigned char[matrix_size*128];

    for (int i=0; i<matrix_size*128; ++i)
    {
        unsigned int matPtr;
        fscanf (pFile, "%u", &matPtr);
        matrix[i]=(unsigned char)matPtr;
    }
    fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    unsigned char* dataPtr;
    unsigned char* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a[dataIndex];
            vocPtr = &b[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

And attached are the files with sample matrices.

matrixa matrixb

I am using windows.h just to calculate the consuming time, so if you want to test the code in another platform than windows, just change windows.h header and change the way of calculating the consuming time.

This code in my computer is about 0.5 seconds. The problem is that I have another code in Matlab that makes this same thing in 0.05 seconds. In my experiments, I am receiving several matrices like matrix a every second, so 0.5 seconds is too much.

Now the matlab code to calculate this:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);

Ok. Matlab code is using that (x-a)^2 = x^2 + a^2 - 2ab.

So my next attempt was to do the same thing. I deleted my own code to make the same calculations, but It was 1.2 seconds approx.

Then, I tried to use different external libraries. The first attempt was Eigen:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        a(i,j)=(int)*dataPtr++;
    }
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        b(i,j)=(int)*vocPtr ++;
    }
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
    d.row(i).minCoeff(&index[i]);
}

This Eigen code costs 1.2 approx for just the line that says: ab = a*b.transpose();

A similar code using opencv was used also, and the cost of the ab = a*b.transpose(); was 0.65 seconds.

So, It is real annoying that matlab is able to do this same thing so quickly and I am not able in C++! Of course being able to run my experiment would be great, but I think the lack of knowledge is what really is annoying me. How can I achieve at least the same performance than in Matlab? Any kind of soluting is welcome. I mean, any external library (free if possible), loop unrolling things, template things, SSE intructions (I know they exist), cache things. As I said, my main purpose is increase my knowledge for being able to code thinks like this with a faster performance.

Thanks in advance

EDIT: more code suggested by David Hammen. I casted the arrays to int before making any calculations. Here is the code:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    int* a_int;
    int* b_int;

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    a_int = (int*)malloc(a_size*descrSize*sizeof(int));
    b_int = (int*)malloc(b_size*descrSize*sizeof(int));

    for(int i=0; i<descrSize*a_size; ++i)
        a_int[i]=(int)a[i];
    for(int i=0; i<descrSize*b_size; ++i)
        b_int[i]=(int)b[i];

    QueryPerformanceCounter( &liEnd );

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    /*unsigned char* dataPtr;
    unsigned char* vocPtr;*/
    int* dataPtr;
    int* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a_int[dataIndex];
            vocPtr = &b_int[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

The entire process is now 0.6, and the casting loops at the beginning are 0.001 seconds. Maybe I did something wrong?

EDIT2: Anything about Eigen? When I look for external libs they always talk about Eigen and their speed. I made something wrong? Here a simple code using Eigen that shows it is not so fast. Maybe I am missing some config or some flag, or ...

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;

This code is about 0.9 seconds.

4

3 に答える 3

2

C++ コードで確実に問題になっていることの 1 つは、char から int への変換が大量にあることです。ボートロードとは、最大 2*2782*4000*128 文字から int への変換を意味します。char変換へのそれらintは遅く、非常に遅いです。

これを (2782+4000)*128 のような変換に減らすにはint、1 つは 2782*128 で、もう 1 つは 4000*128 の配列のペアを割り当てて、char* aおよびchar* b配列の整数へのキャストの内容を含めます。int*自分の配列ではなく、これらの配列を操作してくださいchar*

もう 1 つの問題は、intvsの使用にある可能性がありますlong。私はWindowsで作業していないので、これは当てはまらないかもしれません。私が作業しているマシンでは、intは 32 ビットで、long現在は 64 ビットです。255*255*128 < 256*256*128 = 2 23であるため、32 ビットで十分です。

それは明らかに問題ではありません。

驚くべきことは、問題のコードが、Matlab コードが作成している 2728 x 4000 の巨大な配列を計算していないことです。さらに印象的なのは、Matlab がこれを int ではなく double で行う可能性が最も高いことです。これは、C/C++ コードよりも優れています。

大きな問題の 1 つはキャッシュです。その 4000*128 配列はレベル 1 キャッシュには大きすぎます。その大きな配列を 2782 回反復しています。あなたのコードは、メモリを待ちすぎています。この問題を解決するには、b配列のチャンクを小さくして、コードがレベル 1 キャッシュで可能な限り長く動作するようにします。

別の問題は最適化if (distance>min_distance) break;です。これは実際には非最適化であると思われます。if最も内側のループ内にテストを配置することは、多くの場合、悪い考えです。その内積をできるだけ速く吹き飛ばします。無駄な計算を除けば、このテストをなくしても害はありません。最も内側のループの分岐を削除できる場合は、明らかに不要な計算を行う方がよい場合があります。これはそれらのケースの 1 つです。このテストを排除するだけで、問題を解決できる場合があります。そうしてみてください。

キャッシュの問題に戻ると、このブランチを取り除く必要があります。これにより、abマトリックスに対する操作を、一度に 256 行以下の小さなチャンクに分割できます。これは、最新の 2 つの Intel チップの L1 キャッシュの 1 つに収まる 128 個の符号なし文字の行数です。250 は 4000 を割るので、そのb行列を論理的に 16 のチャンクに分割することを検討してください。内積の大きな 2872 x 4000 配列を形成したいかもしれませんが、それは小さなチャンクで行います。それを元に戻すことはできますがif (distance>min_distance) break;、バイト単位ではなくチャンク レベルで行ってください。

ほぼ確実に double で動作するので、Matlab を打ち負かすことができるはずですが、unsigned char と int で動作する可能性があります。

于 2012-09-26T09:22:50.693 に答える
1

行列乗算は、一般に、2 つの行列の 1 つに対して最悪のキャッシュ アクセス パターンを使用します。解決策は、行列の 1 つを転置し、そのように格納されたデータに対して機能する特殊な乗算アルゴリズムを使用することです。

あなたの行列はすでに転置されて保存されています。それを通常の順序に転置してから通常の行列乗算を使用すると、パフォーマンスが完全に低下します。

インデックスの順序を 2 番目の行列に反転する独自の行列乗算ループを記述します (実際には何も移動せず、キャッシュの動作を壊すことなく、転置する効果があります)。そして、自動ベクトル化を有効にするためのオプションが何であれ、コンパイラーに渡します。

于 2012-09-26T14:18:53.377 に答える