gpunerd
gpunerd

Reputation: 151

Gathering data from different node using MPI and C++

I'm working on a project which contains several slave nodes and one master node. At some point I need to gather data from different slave nodes(master node can be treated as a slave node too) to master node. The data can be any type but let's suppose it is unsigned int. And that is how the data looks on slave nodes:

node0: |chunk01|chunk02|chunk03|chunk04|....

node1: |chunk11|chunk12|chunk13|chunk14|....

...

noden: |chunkn1|chunkn2|chunkn3|chunkn4|....

The data should be all gathered to node0 and look like this:

node0: |chunk01|chunk11|chunk21|....|chunkn1|chunk02|chunk12|...|chunkn2|...|chunknm|

Which means we concatenate the first chunk from each node together, then second chunk from each node together...

I don't know how to use MPI_Gatherv to implement this because each chunkij has different size, also each node only knows its own chunk size and start index, but not other node's info.

I'm not so familiar with MPI so I'm wondering is there any API which can gather data of different size from various nodes to one node?

Upvotes: 2

Views: 1685

Answers (1)

hcarver
hcarver

Reputation: 7214

Here's an example that you can edit that should work. It is almost certainly not the most optimal way to solve the problem - I'd need more details of your code to comment on that. I haven't checked whether it compiles, but if you fix up any typos I'm happy to try to fix any outstanding bugs.

I also don't know how important efficiency is to you - is this op going to be done hundreds of times per second or once a day? If it's the latter, then this code is probably fine. I'm also assuming C / C++.

// Populate this on each node from MPI_Comm_rank.
int myRank; 
// Populate this on each node from MPI_Comm_size.
int P; 
// Num chunks per core.
const int M = 4;  

// I'm assuming 0 is the master.
int masterNodeRank = 0; 

// Populate this. 
// It only needs to have meaningful data on the master node. 
//If master node doesn't have the data, fill with MPI_GATHER.
int* sizeOfEachChunkOnEachRank[M]; 
// Populate this. 
//It needs to exist on every 'slave' node.
int sizeOfMyChunks[M]; 

// Assuming you already have this array
// it should be the contiguous store of each core's data.
unsigned* myData; 
// This is what we'll gather all the data into on master node only.
unsigned* gatheredData = new unsigned[totalDataSize];
// This array will keep all of the displacements from each sending node.
int* displacements = new int[P];

// This keeps track of how many unsigneds we've received so far.
int totalCountSoFar = 0;

// We'll work through all the first chunks on each node at once, then all
// the second chunks, etc.
for(int localChunkNum = 0; localChunkNum < M; ++localChunkNum)
{
  // On the receiving node we need to calculate all the displacements
  // for the received data to go into the array
  if (myRank == masterNodeRank)
  {
    displacements[0] = 0;

    for(int otherCore = 1; otherCore < P; ++otherCore)
    {
      displacements[otherCore] = displacements[otherCore-1] + sizeOfEachChunkOnEachRank[localChunkNum][otherCore-1];
    }
  }

  // On all cores, we'll need to calculate how far into our local array
  // to start the sending from.      
  int myFirstIndex = 0;

  for(int previousChunk=0; previousChunk < localChunkNum; previousChunk++)
  {
    myFirstIndex += sizeOfMyChunks[previousChunk];
  }

  // Do the variable gather
  MPI_Gatherv(&myData[myFirstIndex], // Start address to send from
              sizeOfMyChunks[localChunkNum], // Number to send
              MPI_UNSIGNED, // Type to send
              &gatheredData[totalCountSoFar], // Start address to receive into
              sizeOfEachChunkOnEachRank[localChunkNum], // Number expected from each core
              displacements, // Displacements to receive into from each core
              MPI_UNSIGNED, // Type to receive
              masterNodeRank, // Receiving core rank
              MPI_COMM_WORLD); // MPI communicator.

  // If this is the receiving rank, update the count we've received so far
  // so that we don't overwrite data the next time we do the gather.
  // Note that the total received is the displacement to the receive from the
  // last core + the total received from that core.
  if(myRank == masterNodeRank)
  {
    totalCountSoFar += displacements[P-1] + sizeOfEachChunkOnEachRank[localChunkNum][P-1];
  }
}

delete[] displacements;
delete[] gatheredData;

Upvotes: 1

Related Questions