Brandon Jerz
Brandon Jerz

Reputation: 153

Using MPI to take in data temporarily use it and then return a result

I have an array of starting locations to read a file for each baby node in my program. I'm trying to have the headnode send the location to start reading the file to each nodes and then for them to send back a result.
What makes it difficult is there is not a perfect number of nodes to lines in the file, so they will have to be used over and over. To accomplish this I tried using a for loops for sends and receives where the head node sends messages for the number of lines in the file and the baby nodes receive messages for the number of lines in the file divided by the number of babynodes.

To put it simply it's not working out for me and I really Don't know what to do.

if(qNum == 1){  //If query Number is one
    if(firstSource == 1){ //And the source is equal to 1
        if(my_rank == 0){ // if this process is the head node
            int startVal = 0; // declare variable for starting value
            int z = 1; // declare variable to loop through baby nodes
            for(int i = 1; i <= enronInfo[0]; i++){  // for # of lines in file
                if(z == world_size){ // if process num equals largest process num reset to 1
                    z = 1;
                }
                startVal = getFseekVal(i, firstSource); //set the startVal to the value at location I in the array.
                MPI_Send(&startVal, 1, MPI_INT, z, i, MPI_COMM_WORLD); //send a message to processor z with the startVal
            }
            MPI_Barrier(MPI_COMM_WORLD); //Don't know if this helps
            if(my_rank != 0){ //if not the headnode
                int startVal; // declare variable for starting value
                for(int i = 0; i<=babyLoopSize; i++){ // for # of lines in processor divided by # of babynodes
                    MPI_Recv(&startVal, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // receive a message with startVal from the headnode
                }
            }
            MPI_Barrier(MPI_COMM_WORLD); // Don't know if this helps
        }
    }







#include <fstream>
#include <string>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

using namespace std;
int enronDSarr[39859], nipsDSarr[1499], kosDSarr[3429], nytDSarr[299999]; //containers for docstring values
string enronV = "/home/mcconnel/BagOfWords/vocab.enron.txt";
string nipsV = "/home/mcconnel/BagOfWords/vocab.nips.txt";
string kosV = "/home/mcconnel/BagOfWords/vocab.kos.txt";
string nytV = "/home/mcconnel/BagOfWords/vocab.nytimes.txt";
string enronDW = "/home/mcconnel/BagOfWords/docword.enron.txt";
string nipsDW = "/home/mcconnel/BagOfWords/docword.nips.txt";
string kosDW = "/home/mcconnel/BagOfWords/docword.kos.txt";             // Strings for locations of each file
string nytDW = "/home/mcconnel/BagOfWords/docword.nytimes.txt";
string enronDS = "/home/mcconnel/BagOfWords/docstart.enron.txt";
string nipsDS = "/home/mcconnel/BagOfWords/docstart.nips.txt";
string kosDS = "/home/mcconnel/BagOfWords/docstart.kos.txt";
string nytDS = "/home/mcconnel/BagOfWords/docstart.nytimes.txt";
int enronInfo[3], nipsInfo[3], kosInfo[3], nytInfo[3];                  // Arrays storing first 3 lines of info from each DocWords file
int firstSource, secondSource, numTimes, qNum, wordLength;
string enteredWord;
char enteredWordChar[50];



int word2int(string fileLocation, string input){                // converts text from a file into a word count value
        ifstream file;
        file.open(fileLocation.c_str());
        string word;
        int i = 1;
        int wordNum;
        while(file.good()){
                file >> word;
                if(word.compare(input)== 0){
                        wordNum = i;
                        return wordNum;
                }
                i++;
        }
        return wordNum;
}

int getFseekVal(int docNumber, int sourceNumber){
        if(sourceNumber  == 1){
                return enronDSarr[docNumber - 1];
        }
        else if(sourceNumber == 2){
                return nipsDSarr[docNumber - 1];
        }
        else if(sourceNumber == 3){
                return kosDSarr[docNumber - 1];
        }
        else{
                return nytDSarr[docNumber - 1];
        }
}


string int2word(string fileLocation, int wordInt){              // converts a word count value from a file into the actual text
        ifstream file;
        file.open(fileLocation.c_str());
        string word;
        int i = 1;
        string retWord;
        while(file.good()){
                file >> word;
                if(i == wordInt){
                        retWord = word;
                        return retWord;
                }
                i++;
        }
        return 0;
}


int getInfoDW(string fileLocation, int pos){                    //imports an array of length 3 for each document's info in the docwords file
        ifstream file;
        file.open(fileLocation.c_str());
        int word;
        int i = 0;
        int retWord;
        while(file.good()){
                file >> word;
                if(i == 0 && pos == 0){
                        return word;
                }
                if(i == 1 && pos == 1){
                        return word;
                }
                if(i == 2 && pos == 2){
                        return word;
                }
                i++;
        }
        return retWord;
}

int getEnronDS(string fileLocation){                            // imports array from Enron docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(), "r");
    for(i = 0; i<39861; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                enronDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}


int getNipsDS(string fileLocation){                             // imports array from NIPS docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<1500; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                nipsDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}

int getKosDS(string fileLocation){                              // imports array from KOS docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<3430; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                kosDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}

int getNytDS(string fileLocation){                              // imports array from NYT docstart file
    int i;
    int z = 0;
    FILE *MyFile;
    char line[25];
    MyFile=fopen(fileLocation.c_str(),"r");
    for(i = 0; i<300000; i++){
        fscanf(MyFile, "%s", line);
        if(i != 0 && i % 2 == 1){
                nytDSarr[z] = atoi(line);
                z++;
        }
    }
    fclose(MyFile);
}

int getCurrentDS(int fileNumber, int documentNum){              //Will be used to return docstart byte value at document location
        if(fileNumber == 0){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 1){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 2){
                return enronDSarr[documentNum - 1];
        }
        if(fileNumber == 3){
                return enronDSarr[documentNum - 1];
        }
        else{
                printf("Something definitely went wrong");
        }
}

int getSourceNumber(){
        int source;
        printf("Select a wordbag:\n 1. Enron \n 2. NIPS \n 3. KOS\n 4. NYT\n");
        cin >> source;
        return source;
}
int getUserResponse(){
   int i = 1;
   while(i){
        printf("Choose a query(1-4) and press enter:\n");
        printf("1. What percent of documents in X use any one word more than ____ times?\n");
        printf("2. What words in X are used more than ____ times in any document?\n");
        printf("3. In which data set does the word ____ appear most frequently?\n");
        printf("4. Does ____ appear more frequently in X or Y?\n");
        cin >> qNum;
        if(qNum < 5 && qNum > 0){
                i = 0;
                printf("%d", qNum);
        }
        else{
                printf("Invalid Response, Please Try Again \n");
        }
        if(qNum == 1){
                firstSource = getSourceNumber();
                printf("and how many times?\n");
                cin >> numTimes;
               // query1(firstSource, numTimes);

        }
        else if(qNum == 2){
                firstSource = getSourceNumber();
                printf("and how many times?\n");
                cin >> numTimes;

        }
        else if(qNum == 3){
                printf("What word would you like to use?\n");
                cin >> enteredWord;
        }
        else if(qNum == 4){
                printf("What word would you like to use?\n");
                cin >> enteredWord;
                printf("Select your first source...\n");
                firstSource = getSourceNumber();
                printf("Select your second source...\n");
                secondSource = getSourceNumber();
        }

    }
}

void importFiles(){
        getEnronDS(enronDS);
        getNipsDS(nipsDS);
        getKosDS(kosDS);                                                        // Functions to read in arrays for each docstart fil
        getNytDS(enronDS);
        for(int a = 0; a <= 2; a++){
                 enronInfo[a] = getInfoDW(enronDW, a);
                 nipsInfo[a] = getInfoDW(nipsDW, a);
                 kosInfo[a] = getInfoDW(kosDW, a);
                 nytInfo[a] = getInfoDW(nytDW, a);
        }
}




int main(int argc, char** argv){
        int world_size, my_rank, numDocs,fseekVal, babyLoopSize, babyLoopSize2, babyLoopSize3, babyLoopSize4;
        MPI_Init(NULL,NULL);
        MPI_Comm_size(MPI_COMM_WORLD, &world_size);
        MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
        cout << " my rank is " << my_rank << "\n";
        importFiles();
        cout << " my rank is " << my_rank << " I know that enron's doc size is " << enronInfo[0] << "\n";
        if(my_rank == 0){
                getUserResponse();
        }
        MPI_Barrier(MPI_COMM_WORLD);
        babyLoopSize = enronInfo[0] / (world_size - 1);
        babyLoopSize2 = nipsInfo[0] / (world_size - 1);
        babyLoopSize3 = kosInfo[0] / (world_size - 1);
        babyLoopSize4 = nytInfo[0] / (world_size - 1);
        //cout << " my rank is " << my_rank << " I know that enron's doc size is " << enronInfo[0] << " and that my babyLoopSize for enron is " << babyLoopSize << "\n";
        //cout << " my rank is " << my_rank << " I know that qNum is  " << qNum << "\n";
        MPI_Bcast(&qNum, 1, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&firstSource, 2, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&secondSource,3, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&enteredWordChar,4, MPI_CHAR, 0, MPI_COMM_WORLD);


        if(qNum == 1){
                if(firstSource == 1){
                        if(my_rank == 0){
                                int startVal = 0;
                                int z = 1;
                                for(int i = 1; i <= enronInfo[0]; i++){
                                        if(z == world_size){
                                                z = 1;
                                        }
                                        startVal = getFseekVal(i, firstSource);
                                        MPI_Send(&startVal, 1, MPI_INT, z, i, MPI_COMM_WORLD);
                                }
                        if(my_rank != 0){
                                int startVal;
                                for(int i = 0; i<=babyLoopSize; i++){
                                        MPI_Recv(&startVal, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
                                }
                        }
                        MPI_Barrier(MPI_COMM_WORLD);
                }
        }




        MPI_Finalize();

}

Upvotes: 0

Views: 292

Answers (1)

ilent2
ilent2

Reputation: 5241

MPI_Send is typically a blocking command, see here:

This routine may block until the message is received by the destination process.

This means that every time you run MPI_Send on your master process, the corresponding slave process must run MPI_Recv before the master can continue. By including the first MPI_Barrier line you are telling each slave to wait for the master to finish sending all data before letting the slaves receive the data, but because of the blocking nature of MPI_Send, the master will never return from the first call to MPI_Send.

For your problem I would suggest start by braking it down into smaller pieces. Write code to answer each sub problem:

// Determine how many workers we have
int nWorkers = ...;

// Determine where in the file each worker should start
// If we store each location in an array we can make use of
// another MPI command latter
int aStartingLocs[nWorkers] = {...};

// Distribute starting locations to each worker
int nMyStart;
MPI_Scatter(aStartingLocs, 1, MPI_INT,   //< Things to send
            &nMyStart, 1, MPI_INT,       //< Recieved value
            ...);

// Our starting location is stored in nMyStart
// TODO: Use starting location to compute results
double dResult = ...;

// We can use MPI_Gather to send the values back to master
double aResults[nWorkers];
MPI_Gather(&dResult, 1, MPI_DOUBLE,    //< What we are sending
           aResults, 1, MPI_DOUBLE,    //< Where to store the result
           ...);

// Now we can use the result how we choose
if (bIAmMaster)
{
   // TODO: Use the results
}

The above is just an outline (semi-pseudo code/commenting), but hopefully you will be able to use it as a guide.

I havn't used MPI in a while so you should definatly check the syntax of MPI_Scatter and MPI_Gather. Check out the first link that came up on Google.

If you need to scatter a number of items that is not divisible by the number of workers you can use MPI_Scatterv and MPI_Gatherv instead, see the man page.

If you are so inclined you can attempt to implement your own versions of MPI_Scatter(v), I recall a HPC assignment on doing exactly that. However, most of the time it is probably easier/better to just use library functions.

Upvotes: 1

Related Questions