ColMath
ColMath

Reputation: 21

C++ MPI Send Receive Issues

I am working on a parallel computing project but am running into an issue with the cores communicating. The program works but runs into issues at various points. The point that the program reaches is dependent upon the number of cores used. So I am to compute an integral by summing various methods of a highly oscillatory function. The computations for each should be split between the cores and summed. It is supposed to go through various wavenumbers to showcase the behavior. So I need to split each computation into sections then dispatch those to the cores then bring them back and sum them. After this I need to repeat it for the next wavenumber, these wavenumbers are [100,10000]. And the area of integration goes from 0 to pi for each that needs to be split into equal parts using the wavenumber.

For this code if I run it on one core I am able to make it to wavenumber 120, if I use 2 cores it goes until about wavenumber 240, 3 cores is 360, 4 cores is 477. This trend continues on more or less.

Any insight on what I need to fix is greatly apprecaited.

#include <iostream>

#include <fstream>

#include <vector>

#include <typeinfo>

#include "mpi.h"

#include "math.h"



using namespace std;


double fun_val_points(int kw, double x);

vector<double> integrand(double (fn)(int wn, double x), double wn, 
vector<double> pts, int qn);

const double PI = 3.141592653589793;

int main(int argc, char* argv[]) {
  MPI::Init(argc, argv);

  for (int wavenum = 100; wavenum <= 10000; wavenum++){
    // Declare variables
    int rank, numcores, tag = 123;
    int div, rem, qn = 101;             // qn represents the number of 
                                        //quadrature points

    // Initialize variables
    rank = MPI::COMM_WORLD.Get_rank();
    numcores = MPI::COMM_WORLD.Get_size();
    div = int((wavenum+1) / numcores);
    rem = (wavenum+1) % numcores;
    vector<double>points(div + rem, 0.0);

    if (rank == 0) {
      vector<double> quadraturePoints(wavenum + 1, 0.0);    
           //quadraturePoints needs to be k+1 equally spaced points
                            // between 0 and pi. It then needs to be 
                           //distributed
                            // to each core in a load balanced way.
        for (int i = 0; i < wavenum + 1; i++) {
          quadraturePoints[i] = PI * (i) / (wavenum);    // Initialize 
                                                         //quadraturePoints
        }

        for (int j = 0; j < numcores - 1; j++) {
                vector<double> sendpoints = vector<double>(div + rem, 0.0);
            int index = 0;
            for (int n = j * div; (n <= (j + 1) * div) && (index < div + rem); n++) {
              sendpoints[index++] = quadraturePoints[n];    // Get the 
                             //relevent quadrature points for this core
            }
            //Send the data to the core
            MPI::COMM_WORLD.Send(&sendpoints[0], sendpoints.size(), MPI_DOUBLE, j, tag);

        }
        // Send data to the last core, which needs to have any remaining 
         //quadrature points
        vector<double> sendpoints = vector<double>(div + rem, 0.0);
        int index = 0;
        for (int n = (numcores-1) * div; n < wavenum + 1; n++) {
            sendpoints[index++] = quadraturePoints[n];
        }

        MPI::COMM_WORLD.Send(&sendpoints[0], sendpoints.size(), MPI_DOUBLE, numcores - 1, tag);

    }

    vector<double> localQuads(div+rem,0.0);
    // Recieve the quadrature points for local calculation
    MPI::COMM_WORLD.Recv(&localQuads[0], div + rem + 1, MPI_DOUBLE, 0, tag);
    while(!localQuads.empty() && localQuads.back() <= .00000001){
      localQuads.pop_back();  // Remove any spare 0's on the end of the quad 
                           //points.
                          // This is here because some quadrature points 
                           //were sent as longer lists than necessary
                          // so that they would all be the same length
    }


    vector<double> localWTS(3, 0.0);  // To keep track of the integrals 
                                      //across the local quad points
    for(int i = 0; i < localQuads.size()-1; i++){
       vector<double> partition(qn+1, 0.0);  // Partition the quadrature
      for (double j = 0; j < qn+1; j++){
        partition[j] = localQuads[i] + (j/qn)*(localQuads[i+1] - localQuads[i]);
      }
      vector<double> temp = integrand(fun_val_points, wavenum, partition, 
 partition.size());  // Integrate the partition
      for (int j = 0; j < 3; j++){
        localWTS[j] += temp[j]; // Add the integrated values to the running 
                                //total
      }
    }

    // Send the local integration values back to master
    MPI::COMM_WORLD.Send(&localWTS[0], 3, MPI_DOUBLE, 0, tag);


    if (rank == 0) {
      vector<double> totalWTS(3, 0.0);
        for (int commRank = 0; commRank < numcores; commRank++) {
            MPI::COMM_WORLD.Recv(&localWTS[0], 3, MPI_DOUBLE, commRank, tag);
        // Gather all the integral values and add them to the running total
            for (int index = 0; index < 3; index++) {
                totalWTS[index] += localWTS[index];
            }
        }

        ofstream out; // Open a text file for output to disk
        out.open("data.txt", ios_base::app);

        if(!out.is_open()){ // In case there was an error opening the file
          cout << "Error opening output file." << endl;
        }

        out << wavenum << " ";
        out.precision(16);
        out << totalWTS[0] << " " << totalWTS[1] << " " << totalWTS[2] << endl;
        out.close();
    }
  }
    MPI::Finalize();
    return 0;

}

double fun_val_points(int kw, double x) {
    return cos(100 * x - kw*sin(x));
}

vector<double> integrand(double (fn)(int wn, double x), double wn,vector<double> pts, int qn) {
    double M, T, S;
    M = 0;
    T = 0;
    for (int j = 1; j < qn; j++) {
        double len = pts[j] - pts[j - 1];
        double mpts = (pts[j] + pts[j - 1]) / 2;

        M += fn(wn, mpts)*len;
        T += (fn(wn, pts[j - 1]) + fn(wn, pts[j]))*len / 2;
    }
    S = M * 2 / 3 + T * 1 / 3;

    return {M, T, S};
} 

Upvotes: 0

Views: 154

Answers (1)

Gilles Gouaillardet
Gilles Gouaillardet

Reputation: 8395

With respect to the MPI standard, your program is incorrect.

The reason is task 0 MPI_Send() to itself and no receive has been posted.

From a pragmatic point of view, that will "work" just fine with small messages, but hang with large messages. Note small vs large depends on your MPI library, the interconnect you use plus other factors, so long story short, assume this pattern will lead to a hang and do not do that.

That is typically avoided by using MPI_Irecv() or MPI_Sendrecv().

That being said, your communication pattern is begging for MPI collective operations : MPI_Scatterv(quadraturePoints, ...) and then MPI_Reduce(localWTS, ...)

Upvotes: 1

Related Questions