Reputation: 23
I have the following code. I am trying to calculate some things for a vector in an MPI environment, where the overarching problem will be broken down and calculated by other processes.
The problem is that when i run the code everything works fine until the very end, after MPI_Finalize where the program hangs. I am using the mpicc -o ex2 ex2.c command in a virtualbox ubuntu linux to compile the program and mpirun -np 4 ./ex2 to run the .exe.
The terminal displays:
[proxy:0@papa-VirtualBox] HYDU_sock_write (lib/utils/sock.c:250): write error (Broken pipe)
[proxy:0@papa-VirtualBox] HYD_pmcd_pmip_control_cmd_cb (proxy/pmip_cb.c:525): unable to write to downstream stdin
[proxy:0@papa-VirtualBox] HYDT_dmxu_poll_wait_for_event (lib/tools/demux/demux_poll.c:76): callback returned error status
[proxy:0@papa-VirtualBox] main (proxy/pmip.c:122): demux engine error waiting for event
[mpiexec@papa-VirtualBox] control_cb (mpiexec/pmiserv_cb.c:280): assert (!closed) failed
[mpiexec@papa-VirtualBox] HYDT_dmxu_poll_wait_for_event (lib/tools/demux/demux_poll.c:76): callback returned error status
[mpiexec@papa-VirtualBox] HYD_pmci_wait_for_completion (mpiexec/pmiserv_pmci.c:173): error waiting for event
[mpiexec@papa-VirtualBox] main (mpiexec/mpiexec.c:260): process manager error waiting for completion
Can anyone help ?
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
int main(int argc, char *argv[])
{
int RANK, Size, n,rep = 1;
int *X = NULL;
double Mean, variance, Range;
int Min, Max;
// Initializing the MPI environment
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &RANK);
MPI_Comm_size(MPI_COMM_WORLD, &Size);
while(rep == 1)
{
if (RANK == 0)
{
// Input for the size of vector
printf("Give me the size of the vector (n): ");
// Flushing the stdout because the MPI environment is a little bit buggy with it
fflush(stdout);
scanf("%d", &n);
// Inputing the elements of the vector
X = malloc(n * sizeof(int));
printf("Give me the %d elements of the vector:\n", n);
for (int i = 0; i < n; i++) {
scanf("%d", &X[i]);
}
}
// Broadcasting the size of the vector to the rest of ranks
MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);
// Memory allocation for the vector in the rest of the ranks
if (RANK != 0)
{
X = malloc(n * sizeof(int));
}
// Broadcasting the vector to all the other processes
MPI_Bcast(X, n, MPI_INT, 0, MPI_COMM_WORLD);
// Determining the local size for each process (it gets freed at line 140)
int local_size = n / Size + (RANK < n % Size ? 1 : 0);
int *local_data = malloc(local_size * sizeof(int));
int *sendcounts = NULL, *displs = NULL;
if (RANK == 0)
{
// Preparing the sendcounts and displacements before scattering them
sendcounts = malloc(Size * sizeof(int));
displs = malloc(Size * sizeof(int));
for (int i = 0; i < Size; i++) {
sendcounts[i] = n / Size + (i < n % Size ? 1 : 0);
displs[i] = i * (n / Size) + (i < n % Size ? i : n % Size);
}
}
// Scattering the data
MPI_Scatterv(X, sendcounts, displs, MPI_INT, local_data, local_size, MPI_INT, 0, MPI_COMM_WORLD);
// Calculating the local sums, min, max
int local_sum = 0, local_min = local_data[0], local_max = local_data[0];
for (int i = 0; i < local_size; i++)
{
local_sum += local_data[i];
if (local_data[i] < local_min) local_min = local_data[i];
if (local_data[i] > local_max) local_max = local_data[i];
}
// MPI_Reduce to find the global min, max and to get the global sum
int global_sum, global_min, global_max;
MPI_Reduce(&local_sum, &global_sum, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&local_min, &global_min, 1, MPI_INT, MPI_MIN, 0, MPI_COMM_WORLD);
MPI_Reduce(&local_max, &global_max, 1, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD);
// Calculation for Mean
if (RANK == 0)
{
Mean = (double)global_sum / n;
Min = global_min;
Max = global_max;
Range = Max - Min;
printf("Mean : %.2f, Min : %d, Max : %d, Range : %.2f\n", Mean, Min, Max, Range);
}
// Broadcasting Mean, Min, Max to the rest of the processes in the MPI environment
MPI_Bcast(&Mean, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
MPI_Bcast(&Min, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&Max, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&Range, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
// Calculating the local variance and the new vector Δ
double local_variance = 0.0;
double *local_D = malloc(local_size * sizeof(double));
for (int i = 0; i < local_size; i++)
{
local_variance += (local_data[i] - Mean) * (local_data[i] - Mean);
local_D[i] = ((local_data[i] - Min) / Range) * 100.0;
}
// Using MPI_Reduce to get the global variance
double global_variance;
MPI_Reduce(&local_variance, &global_variance, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
if (RANK == 0) {
global_variance /= n;
printf("Variance : %.2f\n", global_variance);
}
// Using Gather to gather the vector Δ to rank 0
double *D = NULL;
if (RANK == 0) D = malloc(n * sizeof(double));
MPI_Gatherv(local_D, local_size, MPI_DOUBLE, D, sendcounts, displs, MPI_DOUBLE, 0, MPI_COMM_WORLD);
// Finding the max of Δ and its index
double local_max_D = local_D[0];
int local_max_D_index = RANK * (n / Size) + (RANK < n % Size ? RANK : n % Size);
for (int i = 0; i < local_size; i++)
{
if (local_D[i] > local_max_D)
{
local_max_D = local_D[i];
local_max_D_index = RANK * (n / Size) + (RANK < n % Size ? RANK : n % Size) + i;
}
}
struct {
double value;
int RANK;
} local_result = {local_max_D, local_max_D_index}, global_result;
MPI_Reduce(&local_result, &global_result, 1, MPI_DOUBLE_INT, MPI_MAXLOC, 0, MPI_COMM_WORLD);
// Printing max Δ
if (RANK == 0)
{
printf("Max Δ : %.2f at index %d (value: %d)\n", global_result.value, global_result.RANK, X[global_result.RANK]);
}
// Calculating prefix sums
int prefix_sum;
MPI_Scan(&local_sum, &prefix_sum, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
printf("Prefix sum: %d\n", prefix_sum);
// Freeing alocated memory
free(local_data);
free(local_D);
if (RANK == 0) {
free(X);
free(sendcounts);
free(displs);
free(D);
}
// Menu for looping the process
if (RANK == 0) {
printf("Would you like the program to run again ?\n1:Yes\n0:No\n");
fflush(stdout);
scanf("%d",&rep);
}
}
MPI_Finalize();
return 0;
}
Upvotes: 1
Views: 66
Reputation: 946
Only rank 0 will ever leave the while loop. Therefore, all other ranks will wait in the first MPI_Bcast
call while rank 0 waits in MPI_Finalize
for the other ranks. Adding MPI_Bcast(&rep, 1, MPI_INT, 0, MPI_COMM_WORLD);
at the end of the while loop allows to cleanly exit the program.
What is the point of bcasting the vector X
, if each process only works on local_data
? The other processes actually leak this array, as they malloc memory for X
, but never free it.
It is possible to calculate the variance without broadcasting the mean. Instead you can calculate the global sum of square values (ssq). variance = ssq/n - Mean*Mean
// Calculating the local sums, min, max
int local_sum = 0, local_min = local_data[0], local_max = local_data[0], local_square_sum = 0;
for (int i = 0; i < local_size; i++)
{
local_sum += local_data[i];
local_square_sum += local_data[i]*local_data[i];
if (local_data[i] < local_min) local_min = local_data[i];
if (local_data[i] > local_max) local_max = local_data[i];
}
// MPI_Reduce to find the global min, max and to get the global sum
int global_sum, global_min, global_max, global_square_sum;
double global_variance;
MPI_Reduce(&local_sum, &global_sum, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&local_square_sum, &global_square_sum, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&local_min, &global_min, 1, MPI_INT, MPI_MIN, 0, MPI_COMM_WORLD);
MPI_Reduce(&local_max, &global_max, 1, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD);
// Calculation for Mean
if (RANK == 0)
{
Mean = (double)global_sum / n;
global_variance = (double)global_square_sum / n - Mean * Mean;
Min = global_min;
Max = global_max;
Range = Max - Min;
printf("Mean : %.2f, Min : %d, Max : %d, Range : %.2f\n", Mean, Min, Max, Range);
printf("Variance : %.2f\n", global_variance);
}
You might need to change the type of *_square_sum
to long long.
Upvotes: 1