6

私は、基本的な通信およびグループMPI2メソッドを使用した経験が豊富で、MPIを使用して驚異的並列シミュレーション作業をかなり行っています。これまで、ディスパッチノードと多数のワーカーノードを持つようにコードを構造化してきました。ディスパッチノードには、シミュレーターで実行されるパラメーターファイルのリストがあります。各ワーカーノードにパラメータファイルをシードします。ワーカーノードはシミュレーションを実行してから、ディスパッチノードが提供する別のパラメーターファイルを要求します。すべてのパラメータファイルが実行されると、ディスパッチノードは各ワーカーノードをシャットダウンしてから、自身をシャットダウンします。

パラメータファイルの名前は通常「Par_N.txt」です。ここで、Nは識別整数です(例:N = 1-1000)。そのため、カウンターを作成し、このカウンターをすべてのノード間で同期させることができれば、ディスパッチノードを用意する必要がなくなり、システムをもう少しシンプルにすることができると考えていました。これは理論的には単純に聞こえますが、実際には、変更中などにカウンターがロックされていることを確認する必要があるため、少し難しいと思います。MPIに組み込みの方法があるのではないかと考えました。対応しろ。何かご意見は?私はこれを考えすぎていますか?

4

4 に答える 4

10

共有カウンターを実装することは簡単ではありませんが、一度それを実行し、それをどこかのライブラリーに入れると、それを使って多くのことができるようになります。

このようなものを実装する場合に手渡さなければならない「MPI-2の使用」の本では、例の1つ(コードはオンラインで入手可能)は共有カウンターです。「スケーラブルでない」プロセスは、数十のプロセスでうまく機能するはずです。カウンターは、ランクごとに1つずつ、整数の0..size-1の配列であり、「次の作業項目を取得#」操作は次のように構成されます。ウィンドウをロックし、カウンターへの他の全員の貢献度(この場合、彼らが取ったアイテムの数)を読み取り、自分のアイテムを更新し(++)、ウィンドウを閉じ、合計を計算します。これはすべて、受動的な片側操作で行われます。(より適切なスケーリングでは、1-d配列ではなくツリーを使用します)。

したがって、ランク0でカウンターをホストし、作業がなくなるまで、全員が作業単位を実行し、次のカウンターを取得するためにカウンターを更新し続けるという使用法になります。次に、障壁か何かで待って、ファイナライズします。

このようなもの(共有値を使用して次の作業単位を利用可能にする)が機能するようになったら、より洗練されたアプローチに一般化できます。したがって、suzterpattが示唆しているように、最初に作業単位の「共有」を取っている人は誰でもうまく機能しますが、一部が他よりも早く終了した場合はどうすればよいでしょうか。現在の通常の答えは仕事を盗むことです。全員が作業単位のリストをデキューに保持し、作業がなくなると、残りの作業がなくなるまで、他の誰かのデキューのもう一方の端から作業単位を盗みます。これは実際には完全に分散されたバージョンのmaster-workerであり、単一のマスターパーティショニング作業はもうありません。単一の共有カウンターが機能するようになったら、それらからミューテックスを作成し、そこからデキューを実装できます。

更新: わかりました、これが共有カウンターを実行するためのハッキーな試みです-MPI-2本の単純なものの私のバージョン:は機能しているようですが、それよりもはるかに強力なことは何も言いません(長い間このようなもの)。単純なカウンター実装(MPI-2ブックの非スケーリングバージョンに対応)があり、2つの単純なテストがあります。1つはおおよそあなたの作業ケースに対応します。各アイテムは、カウンターを更新して作業アイテムを取得してから、「作業」を実行します(ランダムな時間スリープします)。各テストの最後に、各ランクが実行した増分の数であるカウンターデータ構造が出力されます。

#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

struct mpi_counter_t {
    MPI_Win win;
    int  hostrank ;
    int  myval;
    int *data;
    int rank, size;
};

struct mpi_counter_t *create_counter(int hostrank) {
    struct mpi_counter_t *count;

    count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
    count->hostrank = hostrank;
    MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
    MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

    if (count->rank == hostrank) {
        MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data));
        for (int i=0; i<count->size; i++) count->data[i] = 0;
        MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int),
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    } else {
        count->data = NULL;
        MPI_Win_create(count->data, 0, 1,
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    }
    count -> myval = 0;

    return count;
}

int increment_counter(struct mpi_counter_t *count, int increment) {
    int *vals = (int *)malloc( count->size * sizeof(int) );
    int val;

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

    for (int i=0; i<count->size; i++) {

        if (i == count->rank) {
            MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,
                           count->win);
        } else {
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
        }
    }

    MPI_Win_unlock(0, count->win);
    count->myval += increment;

    vals[count->rank] = count->myval;
    val = 0;
    for (int i=0; i<count->size; i++)
        val += vals[i];

    free(vals);
    return val;
}

void delete_counter(struct mpi_counter_t **count) {
    if ((*count)->rank == (*count)->hostrank) {
        MPI_Free_mem((*count)->data);
    }
    MPI_Win_free(&((*count)->win));
    free((*count));
    *count = NULL;

    return;
}

void print_counter(struct mpi_counter_t *count) {
    if (count->rank == count->hostrank) {
        for (int i=0; i<count->size; i++) {
            printf("%2d ", count->data[i]);
        }
        puts("");
    }
}

int test1() {
    struct mpi_counter_t *c;
    int rank;
    int result;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    result = increment_counter(c, 1);
    printf("%d got counter %d\n", rank, result);

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}


int test2() {
    const int WORKITEMS=50;

    struct mpi_counter_t *c;
    int rank;
    int result = 0;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srandom(rank);

    while (result < WORKITEMS) {
        result = increment_counter(c, 1);
        if (result <= WORKITEMS) {
             printf("%d working on item %d...\n", rank, result);
             sleep(random() % 10);
         } else {
             printf("%d done\n", rank);
         }
    }

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}

int main(int argc, char **argv) {

    MPI_Init(&argc, &argv);

    test1();
    test2();

    MPI_Finalize();
}
于 2011-02-10T18:15:51.527 に答える
3

その問題を解決するための組み込みのメカニズムは考えられません。手動で実装する必要があります。コメントから判断すると、プログラムを分散化する必要があります。その場合、各プロセス(または少なくともプロセスのグループ)は、カウンターの独自の値を保持し、同期を維持する必要があります。これは、非ブロッキングの送信/受信を巧妙に使用することでおそらく実行できますが、それらのセマンティクスは簡単ではありません。

代わりに、ワーカープロセスに一度に複数のファイルを発行するだけで、飽和の問題を解決します。これにより、ネットワークトラフィックが減少し、単純な単一のディスパッチャの設定を維持できます。

于 2011-02-09T18:45:39.947 に答える
0

厳密な順序でファイルを調べる必要があるかどうかは明らかではありません。そうでない場合は、各ノードiですべてのファイルを処理するだけでなく、N % total_workers == iつまり、作業の循環分散を行うのはなぜですか?

于 2011-02-10T20:35:06.973 に答える
0

ディスパッチノードを使用して動的な負荷分散を行っているようです(プロセッサが利用可能になったときにプロセッサに作業を割り当てます)。すべてのプロセッサを停止する必要のない共有カウンタは、停止しません。私はあなたが今持っているものにとどまるか、suszterpattが提案することをして、一度にファイルのバッチを送ることをお勧めします。

于 2011-02-10T02:27:58.283 に答える