Scotty
Scotty

Reputation: 43

MPI_Upack inside userd define operation of MPI_reduce()

I have to send a struct that contains, among other things, a dynamically allocated array of another struct. The receiver has to merge the received message with its data and then send the result to another process.

Basically what I want to obtain is something like that.

enter image description here

I have implemented the following code.

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

typedef struct Data {
  char character;
  int frequency;
} Data;

typedef struct Dictionary {
  int charsNr;
  Data *data;
} Dictionary;

typedef struct Header {
    int id;
    int size;
    MPI_Datatype *type;
    int position;
} Header;

static const int NUM_CHARS = 5;
typedef unsigned char BYTE;

void buildMyTypr(MPI_Datatype *dictType) {
  int blockLengths[] = {1, 1};
  MPI_Datatype types[] = {MPI_CHAR, MPI_INT};
  MPI_Aint offsets[2];

  offsets[0] = offsetof(Data, character);
  offsets[1] = offsetof(Data, frequency);

  MPI_Type_create_struct(2, blockLengths, offsets, types, dictType);
  MPI_Type_commit(dictType);
}

void appendData(Dictionary *dict, Data *data) {
  dict->data = realloc(dict->data, sizeof(Data) * (dict->charsNr+1));
  dict->data[dict->charsNr] = (struct Data) {.character = data->character, .frequency = data->frequency};
  ++dict->charsNr;
}

void mergeDicts(Dictionary *dst, Dictionary *src) {
  for (int i = 0; i < src->charsNr; i++) {
    char character = src->data[i].character;
    int frequency = src->data[i].frequency;
    bool assigned = false;
    for (int j = 0; j < dst->charsNr && !assigned; j++) {
      if (dst->data[j].character == character) {
        dst->data[j].frequency += frequency;
    assigned = true;
      }
    }

    if (!assigned)
      appendData(dst, &src->data[i]);
  }
}

int getRand(const int from, const int to)
{
  int num = (rand() % (to - from + 1)) + from;
  return num;
}

void getMessageSize(int *size, int rank, int tag, MPI_Status *status) {
    MPI_Probe(rank, tag, MPI_COMM_WORLD, status);
    MPI_Get_count(status, MPI_BYTE, size);
}

BYTE* packDictionary(Header *header, Dictionary *dict) {
    header->size = sizeof(int) + (sizeof(Data) * dict->charsNr);

    BYTE *buffer = malloc(sizeof(BYTE) * (header->size));
    header->position = 0;

    MPI_Pack(&dict->charsNr, 1, MPI_INT, buffer, header->size, &header->position, MPI_COMM_WORLD);
    MPI_Pack(dict->data, dict->charsNr, *header->type, buffer, header->size, &header->position, MPI_COMM_WORLD);

    return buffer;
}

void unpackDictionary(Header *header, Dictionary *dict, BYTE *buffer) {
    MPI_Unpack(buffer, header->size, &header->position, &dict->charsNr, 1, MPI_INT, MPI_COMM_WORLD);

    dict->data = malloc(sizeof(Data) * dict->charsNr);
    MPI_Unpack(buffer, header->size, &header->position, dict->data, dict->charsNe, *header->type, MPI_COMM_WORLD);
}

void MyTypeOp(contType *in, contType *out, int *len, MPI_Datatype *typeptr)
{
  MPI_Status *status;
  Dictionary inDict = {.charsNr = 0, .data = NULL};
  Dictionary outDict = {.charsNr = 0, .data = NULL};

  int bufferSize = 0;

  // how can I get the size of the buffers?
  // in other occasion I use the getMessageSize(), but I'm not sure
  // if it can be useful here
  
  // how can I get the type of the message, basically the dictType?

  Header header = {.id = 0, .size = 0, .type = NULL, .position = 0};
  unpackDictionary(&header, &inDict, in);

  // I should update the header with the new size
  unpackDictionary(&header, &outDict, out);

  mergeDicts(&outDict, &inDict);

  header.size = 0;
  out = packDictionary(header, &outDict);
}

int main(int argc, char **argv)
{  
  int proc_number;
  int rank;

  MPI_Comm_size(MPI_COMM_WORLD, &proc_number);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);

  Dictionary dict = {.charsNr = NUM_CHARS, .data = NULL};
  dict.data = malloc(dict.charsNr * sizeof(Data));

  // create a random dictionary
  // I use for simplicity NUM_CHARS but in real life it will be unknown
  // at the beginning and every dictionary can have a different size
  for (int i = 0; i < NUM_CHARS; i++) {
    int freq = getRand(1, 10);
    dict.data[i].frequency = freq;
    dict.data[i].character = 'a' + getRand(1, 5) + i + rank;
  }

  MPI_Datatype dictType;
  buildMyType(&dictType);

  MPI_Op MyOp;
  MPI_Op_create((MPI_User_function *) MyTypeOp, 1, &MyOp);
  
  MPI_Datatype contType;
  MPI_Type_contiguous(1, MPI_PACKED, &contType);
  MPI_Type_commit(&contType);

  Header header = {.id = 0, .size = 0, .type = &dictType, .position = 0};
 
  // when I pack the message I put everithing inside a buffer of BYTE
  BYTE *buffer = packDictionary(&header, &dict);

  BYTE *buffer_rcv = NULL;

  MPI_Reduce(&buffer,& buffer_rcv, 1, contType, MyOp, 0, MPI_COMM_WORLD);

  if(rank == 0)
    for (i = 0; i < NUM_CHARS; i++)
      printf("%c: %d\n", dict.data[i].character, dict.data[i].frequency);

  MPI_Type_free(&contType);
  MPI_Type_free(&dictType);
  MPI_Op_free(&MyOp);

  free(buffer);
  free(buffer_rcv);
  free(dict.data);

  MPI_Finalize();

  return 0;
}

Of course this example cannot run.

Do you have any suggestion on how can I do it?

I'm writing the code in C on Linux machine.

Thanks.

Upvotes: 0

Views: 47

Answers (0)

Related Questions