2

複数のスレーブノードと1つのマスターノードを含むプロジェクトに取り組んでいます。ある時点で、さまざまなスレーブノード(マスターノードもスレーブノードとして扱うことができます)からマスターノードにデータを収集する必要があります。データはどのタイプでもかまいませんが、unsignedintであると仮定しましょう。そして、それがスレーブノードでのデータの見え方です。

node0:| branch01 | branch02 | branch03 | branch04|...。

node1:| Chunk11 | Chunk12 | Chunk13 | Chunk14|...。

..。

noden:| chunkn1 | chunkn2 | branchn3 | chunkn4|...。

データはすべてnode0に収集され、次のようになります。

node0:| branch01 | branch11 | branch21 | .... | branchn1 | branch02 | branch12 | ... | branchn2 | ... | branchnm |

つまり、各ノードの最初のチャンクを連結し、次に各ノードの2番目のチャンクを連結します...

MPI_Gathervを使用してこれを実装する方法がわかりません。これは、chunkijごとにサイズが異なり、各ノードは独自のチャンクサイズと開始インデックスのみを認識し、他のノードの情報は認識しないためです。

私はMPIにあまり詳しくないので、さまざまなノードから1つのノードにさまざまなサイズのデータ​​を収集できるAPIはありますか?

4

1 に答える 1

1

これは、機能するはずの編集可能な例です。問題を解決するための最適な方法ではないことはほぼ間違いありません。それについてコメントするには、コードの詳細が必要です。コンパイルできるかどうかは確認していませんが、タイプミスを修正していただければ、喜んで未解決のバグを修正します。

また、あなたにとって効率がどれほど重要かはわかりません。この操作は 1 秒あたり何百回も実行されるのでしょうか、それとも 1 日に 1 回実行されるのでしょうか? 後者の場合、このコードはおそらく問題ありません。C/C++も想定しています。

// Populate this on each node from MPI_Comm_rank.
int myRank; 
// Populate this on each node from MPI_Comm_size.
int P; 
// Num chunks per core.
const int M = 4;  

// I'm assuming 0 is the master.
int masterNodeRank = 0; 

// Populate this. 
// It only needs to have meaningful data on the master node. 
//If master node doesn't have the data, fill with MPI_GATHER.
int* sizeOfEachChunkOnEachRank[M]; 
// Populate this. 
//It needs to exist on every 'slave' node.
int sizeOfMyChunks[M]; 

// Assuming you already have this array
// it should be the contiguous store of each core's data.
unsigned* myData; 
// This is what we'll gather all the data into on master node only.
unsigned* gatheredData = new unsigned[totalDataSize];
// This array will keep all of the displacements from each sending node.
int* displacements = new int[P];

// This keeps track of how many unsigneds we've received so far.
int totalCountSoFar = 0;

// We'll work through all the first chunks on each node at once, then all
// the second chunks, etc.
for(int localChunkNum = 0; localChunkNum < M; ++localChunkNum)
{
  // On the receiving node we need to calculate all the displacements
  // for the received data to go into the array
  if (myRank == masterNodeRank)
  {
    displacements[0] = 0;

    for(int otherCore = 1; otherCore < P; ++otherCore)
    {
      displacements[otherCore] = displacements[otherCore-1] + sizeOfEachChunkOnEachRank[localChunkNum][otherCore-1];
    }
  }

  // On all cores, we'll need to calculate how far into our local array
  // to start the sending from.      
  int myFirstIndex = 0;

  for(int previousChunk=0; previousChunk < localChunkNum; previousChunk++)
  {
    myFirstIndex += sizeOfMyChunks[previousChunk];
  }

  // Do the variable gather
  MPI_Gatherv(&myData[myFirstIndex], // Start address to send from
              sizeOfMyChunks[localChunkNum], // Number to send
              MPI_UNSIGNED, // Type to send
              &gatheredData[totalCountSoFar], // Start address to receive into
              sizeOfEachChunkOnEachRank[localChunkNum], // Number expected from each core
              displacements, // Displacements to receive into from each core
              MPI_UNSIGNED, // Type to receive
              masterNodeRank, // Receiving core rank
              MPI_COMM_WORLD); // MPI communicator.

  // If this is the receiving rank, update the count we've received so far
  // so that we don't overwrite data the next time we do the gather.
  // Note that the total received is the displacement to the receive from the
  // last core + the total received from that core.
  if(myRank == masterNodeRank)
  {
    totalCountSoFar += displacements[P-1] + sizeOfEachChunkOnEachRank[localChunkNum][P-1];
  }
}

delete[] displacements;
delete[] gatheredData;
于 2012-07-02T13:44:24.323 に答える