3

MPI-2/MPI-3 で導入された MPI の片側通信を学習していて、次のオンライン コース ページに出会いましたMPI_Accumulate

MPI_Accumulate を使用すると、呼び出し元は、ターゲット プロセスに移動されたデータを、ターゲット プロセスでの合計の累積など、既に存在するデータと組み合わせることができます。同じ機能は、MPI_Get を使用してデータを取得することで実現できます (同期が続きます)。呼び出し元で合計操作を実行します。次に、MPI_Put を使用して、更新されたデータをターゲット プロセスに送り返します。Accumulate はこの混乱を単純化します ...

ただし、MPI_Accumulate(max、min、sum、product など) と共に使用できる操作は限られており、ユーザー定義の操作は許可されていません。、sync、op、およびを使用して、上記の乱雑さを実装する方法を考えていました。C/C++ のチュートリアルや実用的なコード例はありますか?MPI_GetMPI_Put

ありがとう


テストするために、このSO questionのコードを適用しました。このコードでは、MPI プロセス間で同期が維持される整数カウンターを作成するために片側通信が使用されます。使用している対象の問題行MPI_Accumulateがマークされています。

コードはそのままコンパイルされ、約 15 秒で返されます。MPI_Accumulateしかし、問題行の直後のコメント ブロックに示されているように、同等の一連の基本操作に置き換えようとすると、コンパイルされたプログラムが無期限にハングします。

何がうまくいかなかったのかMPI_Accumulate、このコンテキストで置き換える正しい方法は何ですか?

PS私はコードをコンパイルしました

g++ -std=c++11 -I..   mpistest.cpp -lmpi

でバイナリを実行しました

mpiexec -n 4 a.exe

コード:

//adpated from https://stackoverflow.com/questions/4948788/
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <thread>
#include <chrono>

struct mpi_counter_t {
  MPI_Win win;
  int  hostrank;  //id of the process that host values to be exposed to all processes
  int  rank;    //process id
  int  size;     //number of processes
  int  val;      
  int  *hostvals; 
};

struct mpi_counter_t *create_counter(int hostrank) {
    struct mpi_counter_t *count;

    count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
    count->hostrank = hostrank;
    MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
    MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

    if (count->rank == hostrank) {
        MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->hostvals));
        for (int i=0; i<count->size; i++) count->hostvals[i] = 0;
        MPI_Win_create(count->hostvals, count->size * sizeof(int), sizeof(int),
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    } 
    else {
        count->hostvals = NULL;
        MPI_Win_create(count->hostvals, 0, 1,
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    }
    count -> val = 0;

    return count;
}

int increment_counter(struct mpi_counter_t *count, int increment) {
    int *vals = (int *)malloc( count->size * sizeof(int) );
    int val;

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

    for (int i=0; i<count->size; i++) {

        if (i == count->rank) {
        MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,count->win); //Problem line: increment hostvals[i] on host
        /* //Question: How to correctly replace the above MPI_Accumulate call with the following sequence? Currently, the following causes the program to hang.
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
            MPI_Win_fence(0,count->win);
            vals[i] += increment;
            MPI_Put(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
            MPI_Win_fence(0,count->win);
        //*/
        } else {
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
        }
    }

    MPI_Win_unlock(0, count->win);

    //do op part of MPI_Accumulate's work on count->rank
    count->val += increment; 
    vals[count->rank] = count->val; 

    //return the sum of vals
    val = 0;
    for (int i=0; i<count->size; i++)
        val += vals[i];

    free(vals);
    return val;
}

void delete_counter(struct mpi_counter_t **count) {
    if ((*count)->rank == (*count)->hostrank) {
        MPI_Free_mem((*count)->hostvals);
    }
    MPI_Win_free(&((*count)->win));
    free((*count));
    *count = NULL;

    return;
}

void print_counter(struct mpi_counter_t *count) {
    if (count->rank == count->hostrank) {
        for (int i=0; i<count->size; i++) {
            printf("%2d ", count->hostvals[i]);
        }
        puts("");
    }
}


int main(int argc, char **argv) {

    MPI_Init(&argc, &argv);

    const int WORKITEMS=50;

    struct mpi_counter_t *c;
    int rank;
    int result = 0;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srand(rank);

    while (result < WORKITEMS) {
        result = increment_counter(c, 1);
        if (result <= WORKITEMS) {
             printf("%d working on item %d...\n", rank, result);
         std::this_thread::sleep_for (std::chrono::seconds(rand()%2));
         } else {
             printf("%d done\n", rank);
         }
    }

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);


    MPI_Finalize();
    return 0;
}

もう 1 つ質問がMPI_Win_fenceあります。ここでは、ロックの代わりに使用する必要がありますか?

- 編集 -

次のようにロック/ロック解除を使用しましincrement_counterた。プログラムは実行されますが、奇妙な動作をします。最終的な出力では、マスター ノードがすべての作業を行います。まだ混乱しています。

int increment_counter(struct mpi_counter_t *count, int increment) {
    int *vals = (int *)malloc( count->size * sizeof(int) );
    int val;

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

    for (int i=0; i<count->size; i++) {

        if (i == count->rank) {
            //MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,count->win); //Problem line: increment hostvals[i] on host
            ///* //Question: How to correctly replace the above MPI_Accumulate call with the following sequence? reports that 0 does all the work
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
            MPI_Win_unlock(0, count->win);
            vals[i] += increment;
            MPI_Put(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
            MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);
        //*/
        } else {
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
        }
    }

    MPI_Win_unlock(0, count->win);

    //do op part of MPI_Accumulate's work on count->rank
    count->val += increment; 
    vals[count->rank] = count->val; 

    //return the sum of vals
    val = 0;
    for (int i=0; i<count->size; i++)
        val += vals[i];

    free(vals);
    return val;
}
4

1 に答える 1