Reputation: 43
I have to send a struct that contains, among other things, a dynamically allocated array of another struct. The receiver has to merge the received message with its data and then send the result to another process.
Basically what I want to obtain is something like that.
I have implemented the following code.
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
typedef struct Data {
char character;
int frequency;
} Data;
typedef struct Dictionary {
int charsNr;
Data *data;
} Dictionary;
typedef struct Header {
int id;
int size;
MPI_Datatype *type;
int position;
} Header;
static const int NUM_CHARS = 5;
typedef unsigned char BYTE;
void buildMyTypr(MPI_Datatype *dictType) {
int blockLengths[] = {1, 1};
MPI_Datatype types[] = {MPI_CHAR, MPI_INT};
MPI_Aint offsets[2];
offsets[0] = offsetof(Data, character);
offsets[1] = offsetof(Data, frequency);
MPI_Type_create_struct(2, blockLengths, offsets, types, dictType);
MPI_Type_commit(dictType);
}
void appendData(Dictionary *dict, Data *data) {
dict->data = realloc(dict->data, sizeof(Data) * (dict->charsNr+1));
dict->data[dict->charsNr] = (struct Data) {.character = data->character, .frequency = data->frequency};
++dict->charsNr;
}
void mergeDicts(Dictionary *dst, Dictionary *src) {
for (int i = 0; i < src->charsNr; i++) {
char character = src->data[i].character;
int frequency = src->data[i].frequency;
bool assigned = false;
for (int j = 0; j < dst->charsNr && !assigned; j++) {
if (dst->data[j].character == character) {
dst->data[j].frequency += frequency;
assigned = true;
}
}
if (!assigned)
appendData(dst, &src->data[i]);
}
}
int getRand(const int from, const int to)
{
int num = (rand() % (to - from + 1)) + from;
return num;
}
void getMessageSize(int *size, int rank, int tag, MPI_Status *status) {
MPI_Probe(rank, tag, MPI_COMM_WORLD, status);
MPI_Get_count(status, MPI_BYTE, size);
}
BYTE* packDictionary(Header *header, Dictionary *dict) {
header->size = sizeof(int) + (sizeof(Data) * dict->charsNr);
BYTE *buffer = malloc(sizeof(BYTE) * (header->size));
header->position = 0;
MPI_Pack(&dict->charsNr, 1, MPI_INT, buffer, header->size, &header->position, MPI_COMM_WORLD);
MPI_Pack(dict->data, dict->charsNr, *header->type, buffer, header->size, &header->position, MPI_COMM_WORLD);
return buffer;
}
void unpackDictionary(Header *header, Dictionary *dict, BYTE *buffer) {
MPI_Unpack(buffer, header->size, &header->position, &dict->charsNr, 1, MPI_INT, MPI_COMM_WORLD);
dict->data = malloc(sizeof(Data) * dict->charsNr);
MPI_Unpack(buffer, header->size, &header->position, dict->data, dict->charsNe, *header->type, MPI_COMM_WORLD);
}
void MyTypeOp(contType *in, contType *out, int *len, MPI_Datatype *typeptr)
{
MPI_Status *status;
Dictionary inDict = {.charsNr = 0, .data = NULL};
Dictionary outDict = {.charsNr = 0, .data = NULL};
int bufferSize = 0;
// how can I get the size of the buffers?
// in other occasion I use the getMessageSize(), but I'm not sure
// if it can be useful here
// how can I get the type of the message, basically the dictType?
Header header = {.id = 0, .size = 0, .type = NULL, .position = 0};
unpackDictionary(&header, &inDict, in);
// I should update the header with the new size
unpackDictionary(&header, &outDict, out);
mergeDicts(&outDict, &inDict);
header.size = 0;
out = packDictionary(header, &outDict);
}
int main(int argc, char **argv)
{
int proc_number;
int rank;
MPI_Comm_size(MPI_COMM_WORLD, &proc_number);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
Dictionary dict = {.charsNr = NUM_CHARS, .data = NULL};
dict.data = malloc(dict.charsNr * sizeof(Data));
// create a random dictionary
// I use for simplicity NUM_CHARS but in real life it will be unknown
// at the beginning and every dictionary can have a different size
for (int i = 0; i < NUM_CHARS; i++) {
int freq = getRand(1, 10);
dict.data[i].frequency = freq;
dict.data[i].character = 'a' + getRand(1, 5) + i + rank;
}
MPI_Datatype dictType;
buildMyType(&dictType);
MPI_Op MyOp;
MPI_Op_create((MPI_User_function *) MyTypeOp, 1, &MyOp);
MPI_Datatype contType;
MPI_Type_contiguous(1, MPI_PACKED, &contType);
MPI_Type_commit(&contType);
Header header = {.id = 0, .size = 0, .type = &dictType, .position = 0};
// when I pack the message I put everithing inside a buffer of BYTE
BYTE *buffer = packDictionary(&header, &dict);
BYTE *buffer_rcv = NULL;
MPI_Reduce(&buffer,& buffer_rcv, 1, contType, MyOp, 0, MPI_COMM_WORLD);
if(rank == 0)
for (i = 0; i < NUM_CHARS; i++)
printf("%c: %d\n", dict.data[i].character, dict.data[i].frequency);
MPI_Type_free(&contType);
MPI_Type_free(&dictType);
MPI_Op_free(&MyOp);
free(buffer);
free(buffer_rcv);
free(dict.data);
MPI_Finalize();
return 0;
}
Of course this example cannot run.
Do you have any suggestion on how can I do it?
I'm writing the code in C on Linux machine.
Thanks.
Upvotes: 0
Views: 47