1

GPU ベースのコンウェイのライフ ゲーム プログラムの作成に取り組んできました。よくわからない場合は、ウィキペディアのページをご覧ください。0 が死んだセルを表し、1 が生きているセルを表す値の配列を保持することで機能する 1 つのバージョンを作成しました。次にカーネルは、セル データに基づいて画像を描画するために画像バッファー データ配列に書き込み、次にレンダリングする次の実行のためにセル配列を更新するために各セルの隣接をチェックします。

ただし、より高速な方法では、セルの値が死んでいる場合は負の数、生きている場合は正の数で表されます。そのセルの数は、隣接セルの数に 1 を加えた値を表します (0 と -0 を区別できないため、0 は不可能な値になります)。ただし、これは、セルをスポーンまたはキルするときに、それに応じて 8 つの隣接セルの値を更新する必要があることを意味します。したがって、隣接するメモリスロットから読み取るだけでよい作業手順とは異なり、この手順ではそれらのスロットに書き込む必要があります。これを行うと一貫性がなくなり、出力された配列が無効になります。たとえば、セルには 14 などの数字が含まれており、これは 13 個の隣人を示しますが、これはありえない値です。CPUで同じ手順を書いたので、コードは正しく、期待どおりに動作します。テスト後、タスクが同時にメモリに書き込もうとすると、何らかの書き込みエラーにつながる遅延があると思います。たとえば、配列データの読み取りと設定の間に遅延があり、その時点でデータが変更され、別のタスクの手順が正しくない可能性があります。私はセマフォとバリアを使用してみましたが、OpenCL と並列処理を学習したばかりで、まだ完全には把握していません。カーネルは次のとおりです。

int wrap(int val, int limit){
    int response = val;
    if(response<0){response+=limit;}
    if(response>=limit){response-=limit;}
    return response;
}

__kernel void optimizedModel(
        __global uint *output,
        int sizeX, int sizeY,
        __global uint *colorMap,
        __global uint *newCellMap,
        __global uint *historyBuffer
)
{
    // the x and y coordinates that currently being computed
    unsigned int x = get_global_id(0);
    unsigned int y = get_global_id(1);

    int cellValue = historyBuffer[sizeX*y+x];
    int neighborCount = abs(cellValue)-1;
    output[y*sizeX+x] = colorMap[cellValue > 0 ? 1 : 0];

    if(cellValue > 0){// if alive
        if(neighborCount < 2 || neighborCount > 3){
            // kill

            for(int i=-1; i<2; i++){
                for(int j=-1; j<2; j++){
                    if(i!=0 || j!=0){
                        int wxc = wrap(x+i, sizeX);
                        int wyc = wrap(y+j, sizeY);
                        newCellMap[sizeX*wyc+wxc] -= newCellMap[sizeX*wyc+wxc] > 0 ? 1 : -1;
                    }
                }
            }
            newCellMap[sizeX*y+x] *= -1;

            // end kill
        }
    }else{
        if(neighborCount==3){
            // spawn

            for(int i=-1; i<2; i++){
                for(int j=-1; j<2; j++){
                    if(i!=0 || j!=0){
                        int wxc = wrap(x+i, sizeX);
                        int wyc = wrap(y+j, sizeY);
                        newCellMap[sizeX*wyc+wxc] += newCellMap[sizeX*wyc+wxc] > 0 ? 1 : -1;
                    }
                }
            }
            newCellMap[sizeX*y+x] *= -1;

            // end spawn
        }
    }
}
  1. 配列出力は、カーネルの計算をレンダリングするために使用されるイメージ バッファー データです。
  2. sizeX定数とsizeY定数は、それぞれ画像バッファーの幅と高さです。
  3. colorMap配列には、イメージ バッファの値を適切に変更して色をレンダリングするために使用される、それぞれ黒と白の rgb 整数値が含まれています。
  4. newCellMap配列は、レンダリングが決定されると計算される更新されたセル マップです
  5. historyBufferは、カーネル呼び出しの開始時のセルの古い状態です。カーネルが実行されるたびに、この配列は newCellMap 配列に更新されます。

さらに、ラップ機能は空間をトロイダルにします。期待どおりに動作するようにこのコードを修正するにはどうすればよいですか。また、タスクによる変更のたびにグローバル メモリが更新されないのはなぜでしょうか。共有メモリではないでしょうか?

4

2 に答える 2

1

シャープネリが彼の答えで言ったように、異なるスレッドから同じメモリゾーンを読み書きしているため、未定義の動作が発生します。

解決策: 2 つの配列 に分割する必要がありますnewCellMap。1 つは前の実行用で、もう 1 つは新しい値が格納される場所です。次に、各呼び出しでホスト側からカーネル引数を変更してoldvalues、次の反復がnewvalues前の反復のものになるようにする必要があります。アルゴリズムをどのように構造化するかにより、実行する前にoldvaluestoの copybuffer も実行する必要がありnewvaluesます。

__kernel void optimizedModel(
        __global uint *output,
        int sizeX, int sizeY,
        __global uint *colorMap,
        __global uint *oldCellMap,
        __global uint *newCellMap,
        __global uint *historyBuffer
)
{
    // the x and y coordinates that currently being computed
    unsigned int x = get_global_id(0);
    unsigned int y = get_global_id(1);

    int cellValue = historyBuffer[sizeX*y+x];
    int neighborCount = abs(cellValue)-1;
    output[y*sizeX+x] = colorMap[cellValue > 0 ? 1 : 0];

    if(cellValue > 0){// if alive
        if(neighborCount < 2 || neighborCount > 3){
            // kill

            for(int i=-1; i<2; i++){
                for(int j=-1; j<2; j++){
                    if(i!=0 || j!=0){
                        int wxc = wrap(x+i, sizeX);
                        int wyc = wrap(y+j, sizeY);
                        newCellMap[sizeX*wyc+wxc] -= oldCellMap[sizeX*wyc+wxc] > 0 ? 1 : -1;
                    }
                }
            }
            newCellMap[sizeX*y+x] *= -1;

            // end kill
        }
    }else{
        if(neighborCount==3){
            // spawn

            for(int i=-1; i<2; i++){
                for(int j=-1; j<2; j++){
                    if(i!=0 || j!=0){
                        int wxc = wrap(x+i, sizeX);
                        int wyc = wrap(y+j, sizeY);
                        newCellMap[sizeX*wyc+wxc] += oldCellMap[sizeX*wyc+wxc] > 0 ? 1 : -1;
                    }
                }
            }
            newCellMap[sizeX*y+x] *= -1;

            // end spawn
        }
    }
}

共有メモリに関する質問については、簡単な答えがあります。OpenCL には、 HOST-DEVICE 間で共有メモリがありません

デバイスのメモリ バッファを作成するときは、最初にそのメモリ ゾーンを初期化し、結果を取得するためclEnqueueWriteBuffer()にそれを読み取る必要clEnqueueWriteBuffer()があります。メモリ ゾーンへのポインターがある場合でも、ポインターはそのゾーンのホスト側のコピーへのポインターです。これは、デバイス計算出力の最新バージョンを持っていない可能性があります。

PD: 私はかなり前に OpenCL で「ライブ」ゲームを作成しましたが、それを行うためのより簡単で高速な方法は、ビットの大きな 2D 配列 (ビット アドレス指定) を作成することであることがわかりました。次に、単純に近隣を分析し、そのセルの更新された値を取得する分岐なしのコードを記述します。ビット アドレス指定が使用されるため、各スレッドによるメモリの読み取り/書き込みの量は、chars/ints/other のアドレス指定よりも大幅に少なくなります。非常に古い OpenCL HW (nVIDIA 9100MG) で 33Mcells/sec を達成しました。if/else アプローチはおそらく最も効率的なものではないことをお知らせします。

于 2013-11-18T09:56:46.093 に答える
1

参考までに、ライフ ゲーム (OpenCL カーネル) の実装をここに示します。

//Each work-item processess one 4x2 block of cells, but needs to access to the (3x3)x(4x2) block of cells surrounding it
//    . . . . . .
//    . * * * * .
//    . * * * * .
//    . . . . . .

 __kernel void life (__global unsigned char * input, __global unsigned char * output){

    int x_length = get_global_size(0);
    int x_id = get_global_id(0);
    int y_length = get_global_size(1);
    int y_id = get_global_id(1);
    //int lx_length = get_local_size(0);
    //int ly_length = get_local_size(1);

    int x_n = (x_length+x_id-1)%x_length; //Negative X
    int x_p = (x_length+x_id+1)%x_length; //Positive X
    int y_n = (y_length+y_id-1)%y_length; //Negative Y
    int y_p = (y_length+y_id+1)%y_length; //Positive X

    //Get the data of the surrounding blocks (TODO: Make this shared across the local group)
    unsigned char block[3][3];
    block[0][0] = input[x_n + y_n*x_length];
    block[1][0] = input[x_id + y_n*x_length];
    block[2][0] = input[x_p + y_n*x_length];
    block[0][1] = input[x_n + y_id*x_length];
    block[1][1] = input[x_id + y_id*x_length];
    block[2][1] = input[x_p + y_id*x_length];
    block[0][2] = input[x_n + y_p*x_length];
    block[1][2] = input[x_id + y_p*x_length];
    block[2][2] = input[x_p + y_p*x_length];

    //Expand the block to points (bool array)
    bool point[6][4];
    point[0][0] = (bool)(block[0][0] & 1);
    point[1][0] = (bool)(block[1][0] & 8);
    point[2][0] = (bool)(block[1][0] & 4);
    point[3][0] = (bool)(block[1][0] & 2);
    point[4][0] = (bool)(block[1][0] & 1);
    point[5][0] = (bool)(block[2][0] & 8);
    point[0][1] = (bool)(block[0][1] & 16);
    point[1][1] = (bool)(block[1][1] & 128);
    point[2][1] = (bool)(block[1][1] & 64);
    point[3][1] = (bool)(block[1][1] & 32);
    point[4][1] = (bool)(block[1][1] & 16);
    point[5][1] = (bool)(block[2][1] & 128);
    point[0][2] = (bool)(block[0][1] & 1);
    point[1][2] = (bool)(block[1][1] & 8);
    point[2][2] = (bool)(block[1][1] & 4);
    point[3][2] = (bool)(block[1][1] & 2);
    point[4][2] = (bool)(block[1][1] & 1);
    point[5][2] = (bool)(block[2][1] & 8);
    point[0][3] = (bool)(block[0][2] & 16);
    point[1][3] = (bool)(block[1][2] & 128);
    point[2][3] = (bool)(block[1][2] & 64);
    point[3][3] = (bool)(block[1][2] & 32);
    point[4][3] = (bool)(block[1][2] & 16);
    point[5][3] = (bool)(block[2][2] & 128);

    //Process one point of the game of life!
    unsigned char out = (unsigned char)0;
    for(int j=0; j<2; j++){
        for(int i=0; i<4; i++){
            char num = point[i][j] + point[i+1][j] + point[i+2][j] + point[i][j+1] + point[i+2][j+1] + point[i][j+2] + point[i+1][j+2] + point[i+2][j+2];
            if(num == 3 || num == 2 && point[i+1][j+1] ){
                out |= (128>>(i+4*j));
            }
        }
    }
    output[x_id + y_id*x_length] = out; //Assign to the output the new cells value
};

ここでは、中間の状態は保存せず、最後のセルの状態 (生/死) だけを保存します。分岐がないため、プロセスが非常に高速です。

于 2013-11-18T20:29:35.840 に答える