Reputation: 12559
EDIT: Sorry I forgot to mention that I multiply two 5000x5000 matrices.
This is the output that indicates that when I increase the number of processes, time is increasing too. So Is there a problem in logic of this code. I found it from web and only changed the name to matrixMulti and its printf. It seems to me logical but not working properly when I connect to Grid lab and increase the number of processes. So what do you think?
/**********************************************************************************************
* Matrix Multiplication Program using MPI.
*
* Viraj Brian Wijesuriya - University of Colombo School of Computing, Sri Lanka.
*
* Works with any type of two matrixes [A], [B] which could be multiplied to produce a matrix [c].
*
* Master process initializes the multiplication operands, distributes the muliplication
* operation to worker processes and reduces the worker results to construct the final output.
*
************************************************************************************************/
#include<stdio.h>
#include<mpi.h>
#define NUM_ROWS_A 5000 //rows of input [A]
#define NUM_COLUMNS_A 5000 //columns of input [A]
#define NUM_ROWS_B 5000 //rows of input [B]
#define NUM_COLUMNS_B 5000 //columns of input [B]
#define MASTER_TO_SLAVE_TAG 1 //tag for messages sent from master to slaves
#define SLAVE_TO_MASTER_TAG 4 //tag for messages sent from slaves to master
void makeAB(); //makes the [A] and [B] matrixes
void printArray(); //print the content of output matrix [C];
int rank; //process rank
int size; //number of processes
int i, j, k; //helper variables
double mat_a[NUM_ROWS_A][NUM_COLUMNS_A]; //declare input [A]
double mat_b[NUM_ROWS_B][NUM_COLUMNS_B]; //declare input [B]
double mat_result[NUM_ROWS_A][NUM_COLUMNS_B]; //declare output [C]
double start_time; //hold start time
double end_time; // hold end time
int low_bound; //low bound of the number of rows of [A] allocated to a slave
int upper_bound; //upper bound of the number of rows of [A] allocated to a slave
int portion; //portion of the number of rows of [A] allocated to a slave
MPI_Status status; // store status of a MPI_Recv
MPI_Request request; //capture request of a MPI_Isend
int main(int argc, char *argv[])
{
MPI_Init(&argc, &argv); //initialize MPI operations
MPI_Comm_rank(MPI_COMM_WORLD, &rank); //get the rank
MPI_Comm_size(MPI_COMM_WORLD, &size); //get number of processes
/* master initializes work*/
if (rank == 0) {
makeAB();
start_time = MPI_Wtime();
for (i = 1; i < size; i++) {//for each slave other than the master
portion = (NUM_ROWS_A / (size - 1)); // calculate portion without master
low_bound = (i - 1) * portion;
if (((i + 1) == size) && ((NUM_ROWS_A % (size - 1)) != 0)) {//if rows of [A] cannot be equally divided among slaves
upper_bound = NUM_ROWS_A; //last slave gets all the remaining rows
} else {
upper_bound = low_bound + portion; //rows of [A] are equally divisable among slaves
}
//send the low bound first without blocking, to the intended slave
MPI_Isend(&low_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &request);
//next send the upper bound without blocking, to the intended slave
MPI_Isend(&upper_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &request);
//finally send the allocated row portion of [A] without blocking, to the intended slave
MPI_Isend(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, i, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &request);
}
}
//broadcast [B] to all the slaves
MPI_Bcast(&mat_b, NUM_ROWS_B*NUM_COLUMNS_B, MPI_DOUBLE, 0, MPI_COMM_WORLD);
/* work done by slaves*/
if (rank > 0) {
//receive low bound from the master
MPI_Recv(&low_bound, 1, MPI_INT, 0, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &status);
//next receive upper bound from the master
MPI_Recv(&upper_bound, 1, MPI_INT, 0, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &status);
//finally receive row portion of [A] to be processed from the master
MPI_Recv(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, 0, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &status);
for (i = low_bound; i < upper_bound; i++) {//iterate through a given set of rows of [A]
for (j = 0; j < NUM_COLUMNS_B; j++) {//iterate through columns of [B]
for (k = 0; k < NUM_ROWS_B; k++) {//iterate through rows of [B]
mat_result[i][j] += (mat_a[i][k] * mat_b[k][j]);
}
}
}
//send back the low bound first without blocking, to the master
MPI_Isend(&low_bound, 1, MPI_INT, 0, SLAVE_TO_MASTER_TAG, MPI_COMM_WORLD, &request);
//send the upper bound next without blocking, to the master
MPI_Isend(&upper_bound, 1, MPI_INT, 0, SLAVE_TO_MASTER_TAG + 1, MPI_COMM_WORLD, &request);
//finally send the processed portion of data without blocking, to the master
MPI_Isend(&mat_result[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_B, MPI_DOUBLE, 0, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD, &request);
}
/* master gathers processed work*/
if (rank == 0) {
for (i = 1; i < size; i++) {// untill all slaves have handed back the processed data
//receive low bound from a slave
MPI_Recv(&low_bound, 1, MPI_INT, i, SLAVE_TO_MASTER_TAG, MPI_COMM_WORLD, &status);
//receive upper bound from a slave
MPI_Recv(&upper_bound, 1, MPI_INT, i, SLAVE_TO_MASTER_TAG + 1, MPI_COMM_WORLD, &status);
//receive processed data from a slave
MPI_Recv(&mat_result[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_B, MPI_DOUBLE, i, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD, &status);
}
end_time = MPI_Wtime();
printf("\nRunning Time = %f\n\n", end_time - start_time);
printArray();
}
MPI_Finalize(); //finalize MPI operations
return 0;
}
void makeAB()
{
for (i = 0; i < NUM_ROWS_A; i++) {
for (j = 0; j < NUM_COLUMNS_A; j++) {
mat_a[i][j] = i + j;
}
}
for (i = 0; i < NUM_ROWS_B; i++) {
for (j = 0; j < NUM_COLUMNS_B; j++) {
mat_b[i][j] = i*j;
}
}
}
void printArray()
{
for (i = 0; i < NUM_ROWS_A; i++) {
printf("\n");
for (j = 0; j < NUM_COLUMNS_A; j++)
printf("%8.2f ", mat_a[i][j]);
}
printf("\n\n\n");
for (i = 0; i < NUM_ROWS_B; i++) {
printf("\n");
for (j = 0; j < NUM_COLUMNS_B; j++)
printf("%8.2f ", mat_b[i][j]);
}
printf("\n\n\n");
for (i = 0; i < NUM_ROWS_A; i++) {
printf("\n");
for (j = 0; j < NUM_COLUMNS_B; j++)
printf("%8.2f ", mat_result[i][j]);
}
printf("\n\n");
}
Upvotes: 1
Views: 1662
Reputation: 50927
There are some real correctness problems with the code as posted. Let's look at the send loop from rank 0:
for (i = 1; i < size; i++) {
//...
low_bound = (i - 1) * portion;
upper_bound = low_bound + portion;
MPI_Isend(&low_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &request);
MPI_Isend(&upper_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &request);
MPI_Isend(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, i, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &request);
}
You can't do that. If you're going to use non-blocking requests, you eventually have to MPI_Wait()
or MPI_Test()
for the requests so you (and the MPI library) can know that they are complete. You need to do this to avoid leaking resources, but even more importantly in this case, you're repeatedly overwriting low_bound
and upper_bound
before you even know that the send has taken place. Who knows what data your worker tasks are getting. In addition, by overwriting the request each time you're absolutely guaranteeing leaking of resources.
There's a few ways to deal with this; easiest is to create a simple array of upper and lower bounds, and an array of requests:
if (rank == 0) {
makeAB();
requests = malloc(size*3*sizeof(MPI_Request));
low_bounds = malloc(size*sizeof(int));
upper_bounds = malloc(size*sizeof(int));
start_time = MPI_Wtime();
for (i = 1; i < size; i++) {
portion = (NUM_ROWS_A / (size - 1));
low_bounds[i] = (i - 1) * portion;
if (((i + 1) == size) && ((NUM_ROWS_A % (size - 1)) != 0)) {
upper_bounds[i] = NUM_ROWS_A;
} else {
upper_bounds[i] = low_bounds[i] + portion;
}
MPI_Isend(&(low_bounds[i]), 1, MPI_INT, i, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &(requests[3*i]));
MPI_Isend(&(upper_bounds[i]), 1, MPI_INT, i, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &(requests[3*i+1]));
MPI_Isend(&mat_a[low_bounds[i]][0], (upper_bounds[i] - low_bounds[i]) * NUM_COLUMNS_A, MPI_DOUBLE, i, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &(requests[3*i+2]));
}
MPI_Waitall(3*(size-1), &(requests[3]), MPI_STATUS_IGNORE);
free(requests);
The nice thing about this is that since rank 0 is saving this information, the workers don't need to send it back when they're done, and rank 0 can just receive directly into the right place:
//...
for (i = low_bound; i < upper_bound; i++) {
for (j = 0; j < NUM_COLUMNS_B; j++) {
for (k = 0; k < NUM_ROWS_B; k++) {
mat_result[i][j] += (mat_a[i][k] * mat_b[k][j]);
}
}
}
MPI_Send(&mat_result[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_B, MPI_DOUBLE, 0, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD);
//...
if (rank == 0) {
for (i = 1; i < size; i++) {
MPI_Recv(&mat_result[low_bounds[i]][0], (upper_bounds[i] - low_bounds[i]) * NUM_COLUMNS_B, MPI_DOUBLE, i, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD, &status);
}
But as long as you have an array of these values which have to be distributed to all processors, you can use an MPI_Scatter operation which will be more efficient in general than your loop over sends:
for (i = 1; i < size; i++) {
low_bounds[i] = ...
upper_bounds[i] = ...
}
MPI_Scatter(low_bounds, 1, MPI_INT, &low_bound, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Scatter(upper_bounds, 1, MPI_INT, &upper_bound, 1, MPI_INT, 0, MPI_COMM_WORLD);
Ideally, you'd use scatter or its variants to distribute the A array as well.
MPI_Scatter()
is a collective operation, like MPI_Bcast()
, which brings us to your next problem. In your original code you have this:
//rank 0:
for (i = 1; i < size; i++ ) {
//...
MPI_Isend();
MPI_Isend();
MPI_Isend();
}
MPI_Bcast();
// other ranks:
MPI_Bcast();
MPI_Recv();
MPI_Recv();
MPI_Recv();
That interleaving of collective and point-to-point communications can be very dangerous, and can lead to deadlock. It's unnecessary here; you should move the Bcast to after the Scatter and Recv()s (only 1 recv now). That leaves your worker-task code looking like:
MPI_Scatter(NULL, 1, MPI_INT, &low_bound, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Scatter(NULL, 1, MPI_INT, &upper_bound, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Recv(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, 0, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &status);
MPI_Bcast(&mat_b, NUM_ROWS_B*NUM_COLUMNS_B, MPI_DOUBLE, 0, MPI_COMM_WORLD);
So that gets rid of most of the correctness issues, although I'd still suggest using scatter to distribute the A array, and then using rank 0 to do its 'fair share' of computation while waiting for the worker tasks to do something. (This has the advantage of meaning that your program will work for size=1). So now let's look at the performance issues.
For fixed problem size, your program has to:
and each task has to
It's easy to see that the amount of actual computation work that has to be done by each rank actually goes down with the number of processors run on as 1/(P-1), but the amount of communications work goes up (as P or lg P, depending). At some point those cross over and running on more processors just slows things down. So where is that point?
Doing a quick scaling test on a single 8-core nehalem node and using IPM to get a simple count of where time is being spent, we have:
worker | running | | MPI
tasks | time | Speedup | time
--------+-----------+----------+--------
1 | 90.85s | - | 45.5s
2 | 45.75s | 1.99x | 15.4s
4 | 23.42s | 3.88x | 4.93s
6 | 15.75s | 5.76x | 2.51s
This is actually not too bad; the MPI time is actually being spent almost entirely in the MPI_Recv()
, which on-node represents the cost of copying the matrix pieces and, for the rank 0 process, waiting for the results to start coming back from the worker tasks. This suggests that having rank 0 do some work, and replacing the linear loop over receives with a gather operation, would be useful optimizations.
Naturally, as you go off-node or to larger number of processors, the communications cost will continue to go up, and scaling will deteriorate.
More minor points:
First, master-slave is generally a pretty poor way to tackle tightly coupled numerical problems with simple loadbalancing like matrix multiplication. But I'll assume this is just a learning-MPI exercise and leave it at that. Note that of course the correct way to do an MPI-based matrix multiplication would be to use existing matrix multiplication libraries like SCALAPACK, Eigen, etc.
Second, the heavy use of global variables is very unhelpful in general, but that goes beyond the scope of this question. I'd also note that NUM_COLUMNS_A
is necessarily NUM_ROWS_B
and you don't need both.
Upvotes: 4
Reputation: 95449
This is actually not terribly surprising. The more workers you have, the more communication overhead you have (dividing up the work, aggregating the results back), and so there is often a sweet spot where you have enough workers that you are able to leverage parallelization, but not so many workers that the communication overhead begins to become an issue. As you increase the number of cores, you get diminishing returns from making the work smaller, and increased communication overhead. This is why, when writing parallel applications, lots of work needs to go into measuring what number of workers yields the best performance as well as designing networking structures that minimize overhead.
Upvotes: 5
Reputation: 44696
When separating processes, you need to balance the time taken to send a result across vs the savings gained. In your case, I would guess the computations you are sending across take longer to send than they do to calculate locally.
Try to share bigger chunks of work with the other processes.
Upvotes: 1