Antony
Antony

Reputation: 23

MPI Program Hangs after MPI_Finalize

I have the following code. I am trying to calculate some things for a vector in an MPI environment, where the overarching problem will be broken down and calculated by other processes.

The problem is that when i run the code everything works fine until the very end, after MPI_Finalize where the program hangs. I am using the mpicc -o ex2 ex2.c command in a virtualbox ubuntu linux to compile the program and mpirun -np 4 ./ex2 to run the .exe.

The terminal displays:

[proxy:0@papa-VirtualBox] HYDU_sock_write (lib/utils/sock.c:250): write error (Broken pipe)
[proxy:0@papa-VirtualBox] HYD_pmcd_pmip_control_cmd_cb (proxy/pmip_cb.c:525): unable to write to downstream stdin
[proxy:0@papa-VirtualBox] HYDT_dmxu_poll_wait_for_event (lib/tools/demux/demux_poll.c:76): callback returned error status
[proxy:0@papa-VirtualBox] main (proxy/pmip.c:122): demux engine error waiting for event
[mpiexec@papa-VirtualBox] control_cb (mpiexec/pmiserv_cb.c:280): assert (!closed) failed
[mpiexec@papa-VirtualBox] HYDT_dmxu_poll_wait_for_event (lib/tools/demux/demux_poll.c:76): callback returned error status
[mpiexec@papa-VirtualBox] HYD_pmci_wait_for_completion (mpiexec/pmiserv_pmci.c:173): error waiting for event
[mpiexec@papa-VirtualBox] main (mpiexec/mpiexec.c:260): process manager error waiting for completion

Can anyone help ?

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>


int main(int argc, char *argv[])
{
   int RANK, Size, n,rep = 1;
   int *X = NULL;
   double Mean, variance, Range;
   int Min, Max;
   // Initializing the MPI environment
   MPI_Init(&argc, &argv);
   MPI_Comm_rank(MPI_COMM_WORLD, &RANK);
   MPI_Comm_size(MPI_COMM_WORLD, &Size);


 while(rep == 1)
 {


   if (RANK == 0)
   {
       // Input for the size of vector
       printf("Give me the size of the vector (n): ");
       // Flushing the stdout because the MPI environment is a little bit buggy with it
       fflush(stdout);
       scanf("%d", &n);


       // Inputing the elements of the vector
       X = malloc(n * sizeof(int));
       printf("Give me the %d elements of the vector:\n", n);
       for (int i = 0; i < n; i++) {
           scanf("%d", &X[i]);
       }
   }


   // Broadcasting the size of the vector to the rest of ranks
   MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);


   // Memory allocation for the vector in the rest of the ranks
   if (RANK != 0)
   {
       X = malloc(n * sizeof(int));
   }


   // Broadcasting the vector to all the other processes
   MPI_Bcast(X, n, MPI_INT, 0, MPI_COMM_WORLD);


   // Determining the local size for each process (it gets freed at line 140)
   int local_size = n / Size + (RANK < n % Size ? 1 : 0);
   int *local_data = malloc(local_size * sizeof(int));
   int *sendcounts = NULL, *displs = NULL;


   if (RANK == 0)
   {
       // Preparing the sendcounts and displacements before scattering them
       sendcounts = malloc(Size * sizeof(int));
       displs = malloc(Size * sizeof(int));
       for (int i = 0; i < Size; i++) {
           sendcounts[i] = n / Size + (i < n % Size ? 1 : 0);
           displs[i] = i * (n / Size) + (i < n % Size ? i : n % Size);
       }
   }


   // Scattering the data
   MPI_Scatterv(X, sendcounts, displs, MPI_INT, local_data, local_size, MPI_INT, 0, MPI_COMM_WORLD);


   // Calculating the  local sums, min, max
   int local_sum = 0, local_min = local_data[0], local_max = local_data[0];
   for (int i = 0; i < local_size; i++)
   {
       local_sum += local_data[i];
       if (local_data[i] < local_min) local_min = local_data[i];
       if (local_data[i] > local_max) local_max = local_data[i];
   }


   // MPI_Reduce to find the global min, max and to get the global sum
   int global_sum, global_min, global_max;
   MPI_Reduce(&local_sum, &global_sum, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce(&local_min, &global_min, 1, MPI_INT, MPI_MIN, 0, MPI_COMM_WORLD);
   MPI_Reduce(&local_max, &global_max, 1, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD);


   // Calculation for Mean
   if (RANK == 0)
   {
       Mean = (double)global_sum / n;
       Min = global_min;
       Max = global_max;
       Range = Max - Min;
       printf("Mean : %.2f, Min : %d, Max : %d, Range : %.2f\n", Mean, Min, Max, Range);
   }


   // Broadcasting Mean, Min, Max to the rest of the processes in the MPI environment
   MPI_Bcast(&Mean, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
   MPI_Bcast(&Min, 1, MPI_INT, 0, MPI_COMM_WORLD);
   MPI_Bcast(&Max, 1, MPI_INT, 0, MPI_COMM_WORLD);
   MPI_Bcast(&Range, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);


   // Calculating the local variance and the new vector Δ
   double local_variance = 0.0;
   double *local_D = malloc(local_size * sizeof(double));
   for (int i = 0; i < local_size; i++)
   {
       local_variance += (local_data[i] - Mean) * (local_data[i] - Mean);
       local_D[i] = ((local_data[i] - Min) / Range) * 100.0;
   }


   // Using MPI_Reduce to get the global variance
   double global_variance;
   MPI_Reduce(&local_variance, &global_variance, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
   if (RANK == 0) {
       global_variance /= n;
       printf("Variance : %.2f\n", global_variance);
   }


   // Using Gather to gather the vector Δ to rank 0
   double *D = NULL;
   if (RANK == 0) D = malloc(n * sizeof(double));
   MPI_Gatherv(local_D, local_size, MPI_DOUBLE, D, sendcounts, displs, MPI_DOUBLE, 0, MPI_COMM_WORLD);


   // Finding the max of Δ and its index
   double local_max_D = local_D[0];
   int local_max_D_index = RANK * (n / Size) + (RANK < n % Size ? RANK : n % Size);
   for (int i = 0; i < local_size; i++)
   {
       if (local_D[i] > local_max_D)
       {
           local_max_D = local_D[i];
           local_max_D_index = RANK * (n / Size) + (RANK < n % Size ? RANK : n % Size) + i;
       }
   }


   struct {
       double value;
       int RANK;
   } local_result = {local_max_D, local_max_D_index}, global_result;


   MPI_Reduce(&local_result, &global_result, 1, MPI_DOUBLE_INT, MPI_MAXLOC, 0, MPI_COMM_WORLD);


   // Printing  max Δ
   if (RANK == 0)
   {
       printf("Max Δ : %.2f at index %d (value: %d)\n", global_result.value, global_result.RANK, X[global_result.RANK]);
   }


   // Calculating prefix sums
   int prefix_sum;
   MPI_Scan(&local_sum, &prefix_sum, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
   printf("Prefix sum: %d\n", prefix_sum);


   // Freeing alocated memory
   free(local_data);
   free(local_D);


   if (RANK == 0) {
       free(X);
       free(sendcounts);
       free(displs);
       free(D);
   }


   // Menu for looping the process
   if (RANK == 0) {
       printf("Would you like the program to run again ?\n1:Yes\n0:No\n");
       fflush(stdout);
       scanf("%d",&rep);
   }
 }


   MPI_Finalize();
   return 0;
  
}

Upvotes: 1

Views: 66

Answers (1)

Joachim
Joachim

Reputation: 946

Only rank 0 will ever leave the while loop. Therefore, all other ranks will wait in the first MPI_Bcast call while rank 0 waits in MPI_Finalize for the other ranks. Adding MPI_Bcast(&rep, 1, MPI_INT, 0, MPI_COMM_WORLD); at the end of the while loop allows to cleanly exit the program.

What is the point of bcasting the vector X, if each process only works on local_data? The other processes actually leak this array, as they malloc memory for X, but never free it.

It is possible to calculate the variance without broadcasting the mean. Instead you can calculate the global sum of square values (ssq). variance = ssq/n - Mean*Mean

   // Calculating the  local sums, min, max
   int local_sum = 0, local_min = local_data[0], local_max = local_data[0], local_square_sum = 0;
   for (int i = 0; i < local_size; i++)
   {
       local_sum += local_data[i];
       local_square_sum += local_data[i]*local_data[i];
       if (local_data[i] < local_min) local_min = local_data[i];
       if (local_data[i] > local_max) local_max = local_data[i];
   }


   // MPI_Reduce to find the global min, max and to get the global sum
   int global_sum, global_min, global_max, global_square_sum;
   double global_variance;
   MPI_Reduce(&local_sum, &global_sum, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce(&local_square_sum, &global_square_sum, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce(&local_min, &global_min, 1, MPI_INT, MPI_MIN, 0, MPI_COMM_WORLD);
   MPI_Reduce(&local_max, &global_max, 1, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD);


   // Calculation for Mean
   if (RANK == 0)
   {
       Mean = (double)global_sum / n;
       global_variance = (double)global_square_sum / n - Mean * Mean;
       Min = global_min;
       Max = global_max;
       Range = Max - Min;
       printf("Mean : %.2f, Min : %d, Max : %d, Range : %.2f\n", Mean, Min, Max, Range);
       printf("Variance : %.2f\n", global_variance);
   }

You might need to change the type of *_square_sum to long long.

Upvotes: 1

Related Questions