1

オールツーワンの通信を順不同で行おうとしています。基本的に、整数IDで識別される、同じサイズの複数の浮動小数点配列があります。

各メッセージは次のようになります。

<int id><float array data>

受信側では、アレイがいくつあるかを正確に把握しているため、正確な数の受信を設定します。メッセージを受信すると、IDを解析し、データを適切な場所に配置します。問題は、メッセージが他のプロセスから受信プロセスに送信される可能性があることです。(たとえば、プロデューサーにはワークキュー構造があり、キューで使用可能なIDを処理します。)

MPIは注文配信でP2Pのみを保証するため、整数IDとFPデータを2つのメッセージに簡単に入れることはできません。そうしないと、受信者がIDとデータを照合できない可能性があります。MPIでは、1回の送信で2種類のデータを使用することもできません。

私は2つのアプローチしか考えられません。

1)受信者はサイズm(source [m])の配列を持ち、mは送信ノードの数です。送信者は最初にIDを送信し、次にデータを送信します。受信者は、送信者iから整数メッセージを受信した後、idをsource[i]に保存します。送信者iからFP配列を受信すると、source [i]をチェックし、IDを取得して、データを適切な場所に移動します。MPIが順序どおりのP2P通信を保証するために機能します。受信者は、送信者ごとに状態情報を保持する必要があります。さらに悪いことに、単一の送信プロセスでデータの前に2つのIDを送信できる場合(マルチスレッドなど)、このメカニズムは機能しません。

2)idとFPをバイトとして扱い、それらを送信バッファーにコピーします。それらをMPI_CHARとして送信すると、レシーバーはそれらを整数とFP配列にキャストバックします。次に、送信側のバイトバッファにコピーするための追加コストを支払う必要があります。MPIプロセス内のスレッドの数が増えると、一時バッファーの合計も増えます。

どちらも完璧なソリューションではありません。プロセス内で何もロックしたくありません。もっと良い提案がありますか?

編集:コードは、infinibandを使用する共有クラスターで実行されます。マシンはランダムに割り当てられます。したがって、TCPソケットがここで私を助けることはできないと思います。さらに、IPoIBは高価に見えます。通信には40Gbpsのフルスピードが必要で、CPUが計算を続けます。

4

2 に答える 2

4

receive関数でソースランクとして指定MPI_ANY_SOURCEし、タグを使用してメッセージを並べ替えることができます。これは、カスタムメッセージを作成するよりも簡単です。簡単な例を次に示します。

#include <stdio.h>
#include "mpi.h"

int main() {
    MPI_Init(NULL,NULL);
    int rank=0;
    int size=1;
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);
    MPI_Comm_size(MPI_COMM_WORLD,&size);

    // Receiver is the last node for simplicity in the arrays
    if (rank == size-1) {
        // Receiver has size-1 slots
        float data[size-1];
        MPI_Request request[size-1];

        // Use tags to sort receives
        for (int tag=0;tag<size-1;++tag){
            printf("Receiver for id %d\n",tag);
            // Non-blocking receive
            MPI_Irecv(data+tag,1,MPI_FLOAT,
                      MPI_ANY_SOURCE,tag,MPI_COMM_WORLD,&request[tag]);
        }

        // Wait for all requests to complete
        printf("Waiting...\n");
        MPI_Waitall(size-1,request,MPI_STATUSES_IGNORE);
        for (size_t i=0;i<size-1;++i){
            printf("%f\n",data[i]);
        }
    } else {
        // Producer
        int id = rank;
        float data = rank;
        printf("Sending {%d}{%f}\n",id,data);
        MPI_Send(&data,1,MPI_FLOAT,size-1,id,MPI_COMM_WORLD);
    }

    return MPI_Finalize();
}
于 2012-10-03T06:51:56.363 に答える
3

誰かがすでに書いたように、あなたはMPI_ANY_SOURCEどんなソースからでも受け取るために使うことができます。1回の送信で2種類のデータを送信するには、派生データ型を使用できます。

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define asize 10

typedef struct data_ {
  int   id;
  float array[asize];
} data;

int main() {

  MPI_Init(NULL,NULL);

  int rank = -1;
  int size = -1;
  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
  MPI_Comm_size(MPI_COMM_WORLD,&size);

  data buffer;    
 // Define and commit a new datatype
  int          blocklength [2];
  MPI_Aint     displacement[2];
  MPI_Datatype datatypes   [2];
  MPI_Datatype mpi_tdata;

  MPI_Aint     startid,startarray;
  MPI_Get_address(&(buffer.id),&startid);
  MPI_Get_address(&(buffer.array[0]),&startarray);

  blocklength [0] = 1;
  blocklength [1] = asize;
  displacement[0] = 0;
  displacement[1] = startarray - startid;
  datatypes   [0] = MPI_INT;
  datatypes   [1] = MPI_FLOAT;

  MPI_Type_create_struct(2,blocklength,displacement,datatypes,&mpi_tdata);
  MPI_Type_commit(&mpi_tdata);

  if (rank == 0) {
    int        count = 0;
    MPI_Status status;

    while (count < size-1 ) {
      // Non-blocking receive
      printf("Receiving message %d\n",count);
      MPI_Recv(&buffer,1,mpi_tdata,MPI_ANY_SOURCE,0,MPI_COMM_WORLD,&status);
      printf("Message tag %d, first entry %g\n",buffer.id,buffer.array[0]);
      // Counting the received messages 
      count++;
    }

  } else {
    // Initialize buffer to be sent
    buffer.id = rank;
    for (int ii = 0; ii < size; ii++) {
      buffer.array[ii] = 10*rank + ii;
    }
    // Send buffer
    MPI_Send(&buffer,1,mpi_tdata,0,0,MPI_COMM_WORLD);
  }

  MPI_Type_free(&mpi_tdata);

  MPI_Finalize();
  return 0;
}
于 2012-10-03T08:49:49.857 に答える