JuMoGar
JuMoGar

Reputation: 1760

How to parallelize a C algorithm with OpenMP correctly?

I am starting with OpenMP and I have to parallelize some C code. I have tried many ways and I have never achieved the desired results.

I will show you my parallelization. If you can advise me about how to parallelize it correctly, I will be very grateful.

I have got a working (but no perfomanced) parallelized code. I have commented with MAYUS comments where perfomance is lost (I can know where code lost perfomance due to I have tested the code for a long time):

#include <File.h>
#include <time.h>
#include <math.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <omp.h>

#define sqr(x) ((x)*(x))
#define MAX_ITER_NO_IMPR 10



void fail(const char * str) {
    fprintf(stderr,"%s", str);
    exit(-1);
}

/**
* calc_distance calculates the distance between a given point and a cluster
* @param int -dim: number of columns (variables) in the data set to be classified
* @param float * -: first arrray to calculate de distance
* @param float * -: Second array to calculate de distance 
* @return float: Euclidean distance of two vectors
*/
float calc_distance(int dim, float *p1, float  *p2) {
    float distance_sq_sum = 0;

    for (int i = 0; i < dim; ++i)
        distance_sq_sum += sqr(p1[i] - p2[i]);

    return distance_sq_sum;  
}

/**
* calc_all_distances computes the euclidean distances between centros ids and dataset points. 
* @param int -dim: number of columns (variables) in the data set to be classified
* @param int -n: number of rows (points) in the data set to be classified   
* @param int -k: number of clusters to be calculated
* @param float * -X: dataset to be classified
* @param float * -centroid: prototypes of each cluster. 
* @param float * -distance_output[n][k] contains the distance between all elements * in the dataset and all clusters
* return void  
*/
void calc_all_distances(int dim, int n, int k, float *X, float *centroid, float *distance_output) {
    #pragma omp parallel for simd collapse(2)
    for (int i = 0; i < n; ++i) // for each point
        for (int j = 0; j < k; ++j) // for each cluster 
            // calculate distance between point and cluster centroid
            distance_output[i*k+j] = calc_distance(dim, &X[i*dim], &centroid[j*dim]);
}


/**
* calc_total_distance calculates the clustering overall distance.  
* @param int -dim: number of columns (variables) in the data set to be classified
* @param int -n: number of rows (points) in the data set to be classified   
* @param int -k: number of clusters to be calculated
* @param float * -X: dataset to be classified
* @param float * -centroid: prototypes of each cluster. 
* @param int * - cluster_assignment_index: current cluster assignment to each point
* @return float overall distance. This is what the algorithm tried to minimize  
*/
float calc_total_distance(int dim, int n, int k, float *X, float *centroids, int *cluster_assignment_index) {
    // NOTE: a point with cluster assignment -1 is ignored
    float tot_D = 0;

    // for every point
    #pragma omp parallel for simd reduction(+:tot_D)
//HERE PERFOMANCE IS LOST
    for (int i = 0; i < n; ++i) {
        // which cluster is it in?
        int active_cluster = cluster_assignment_index[i];

        // sum distance
        if (active_cluster != -1)
            tot_D += calc_distance(dim, &X[i*dim], &centroids[active_cluster*dim]);
    }

    return tot_D;
}


/**
* choose_all_clusters_from_distances obtains the closest cluster for each point.  
* @param int -dim: number of columns (variables) in the data set to be classified
* @param int -n: number of rows (points) in the data set to be classified   
* @param int -k: number of clusters to be calculated
* @param float * -distance_array[n][k] contains the distance between all elements * in the dataset and all clusters
* @param int* - cluster_assignment_index contains the assigned cluster to each point 
* @return void
*/
void choose_all_clusters_from_distances(int dim, int n, int k, float *distance_array, int *cluster_assignment_index) {
    // for each point
    #pragma omp parallel for simd
    for (int i = 0; i < n; ++i) {
        int best_index = -1;
        float closest_distance = INFINITY;

        // for each cluster
//  #pragma omp privete(best_index, closest_distance)
// BEST_INDEX AND CLOSEST_DISTANCE SHOULD NOT BE PRIVATED?
        for (int j = 0; j < k; ++j) {
            // distance between point and cluster centroid
            float cur_distance = distance_array[i*k+j];
            if (cur_distance < closest_distance) {
                best_index = j;
                closest_distance = cur_distance;
            }
        }

        // record in array
        cluster_assignment_index[i] = best_index;
    }
}

/**
* calc_cluster_centroids calculates the new prototypes of all clusters 
* @param int -dim: number of columns (variables) in the data set to be classified
* @param int -n: number of rows (points) in the data set to be classified   
* @param int -k: number of clusters to be calculated
* @param float * -X: dataset to be classified
* @param int * - cluster_assigment_index:  
* @param float * -new_cluster_centroid: it is the output with the new cluster prototypes
*/

void calc_cluster_centroids(int dim, int n, int k, float *X, int *cluster_assignment_index, float *new_cluster_centroid) {
    int * cluster_member_count = (int *) calloc (k,sizeof(float));

    // sum all points
    // for every point
    #pragma omp parallel for simd
//HERE PERFOMANCE IS LOST
    for (int i = 0; i < n; ++i) {
        // which cluster is it in?
        int active_cluster = cluster_assignment_index[i];

        // update count of members in that cluster
        ++cluster_member_count[active_cluster];

        // sum point coordinates for finding centroid
        for (int j = 0; j < dim; ++j)
            new_cluster_centroid[active_cluster*dim + j] += X[i*dim + j];
    }


    // now divide each coordinate sum by number of members to find mean/centroid
    // for each cluster
    #pragma omp for
    for (int i = 0; i < k; ++i) {
        if (cluster_member_count[i] == 0) {
            //printf("WARNING: Empty cluster %d! \n", i);
        //break;
            // SEÑAL PARA CANCELAR EL BUCLE
        #pragma omp cancel for
        }
    // CANCELAR LA EJECUCIÓN DE TODOS LOS HILOS
    #pragma omp cancellation point for

        // for each dimension
    #pragma omp simd 
//HERE PERFOMANCE IS BETTER
        for (int j = 0; j < dim; ++j)
            new_cluster_centroid[i*dim + j] /= cluster_member_count[i];  /// XXXX will divide by zero here for any empty clusters!
    }
}

/**
* get_cluster_member_count the member of each cluster
* @param int -n: number of rows (points) in the data set to be classified   
* @param int -k: number of clusters to be calculated
* @param int* - cluster_assignment_index contains the assigned cluster to each point 
* @param int * -cluster_member_count: count members of each cluster 
*/
void get_cluster_member_count(int n, int k, int *cluster_assignment_index, int *cluster_member_count) {
    // count members of each cluster    
    #pragma omp parallel for
    for (int i = 0; i < n; ++i)
    #pragma omp atomic update
        ++cluster_member_count[cluster_assignment_index[i]];
}


/**
* Visualize the number of members for all clusters
*/
void cluster_diag(int dim, int n, int k, float *X, int *cluster_assignment_index, float *cluster_centroid) {
    int * cluster_member_count = (int *) calloc (k, sizeof(int));

    get_cluster_member_count(n, k, cluster_assignment_index, cluster_member_count);

    printf("  Final clusters\n");
    #pragma omp parallel for ordered
//HERE PERFOMANCE IS LOST
    for (int i = 0; i < k; ++i) { 
    #pragma omp ordered
        printf("\tcluster %d:  members: %8d, for the centroid (", i, cluster_member_count[i]);
        for (int j = 0; j < dim; ++j)  
            #pragma omp ordered 
            printf ("%f, ", cluster_centroid[i*dim + j]);
    #pragma omp ordered
        printf (")\n");
    }
}

void copy_assignment_array(int n, int *src, int *tgt) {
    #pragma omp  parallel for simd
    for (int i = 0; i < n; ++i)
        tgt[i] = src[i];
}  


int assignment_change_count(int n, int a[], int b[]) {
    int change_count = 0;

    #pragma omp parallel for reduction(+:change_count)
    for (int i = 0; i < n; ++i)
        if (a[i] != b[i])
            ++change_count;

    return change_count;
}


/*
* This is C source code for a simple implementation of the popular k-means clustering algorithm. 
* It is based on the implementation in Matlab, which was in turn based on GAF Seber, 
* Multivariate Observations, 1964, and H Spath, Cluster Dissection and Analysis: Theory, FORTRAN Programs, Examples.
* @param int -dim: number of columns (variables) in the data set to be classified (dimension of data)
* @param float * -X: dataset to be classified (pointer to data)
* @param int -n: number of rows (points) in the data set to be classified (number of elements)
* @param int -k: number of clusters to be calculated
* @param float * -cluster_centroid: Initial clusters prototypes or centros (initial cluster centroids)
* @param int iterations -: number of iterations to be performed
* @param int * cluster_assignment_final -: Output classitfication  
*/
void kmeans(int dim, float *X, int n, int k, float *cluster_centroid, int iterations, int *cluster_assignment_final) {
    int floatPointerSize = n * k * sizeof(float);
    int intPointerSize = n * sizeof(int);
    float *dist = (float *) malloc( floatPointerSize );
    int *cluster_assignment_cur = (int *) malloc( intPointerSize );
    int  *cluster_assignment_prev = (int *) malloc( intPointerSize );
    float *point_move_score = (float *) malloc( floatPointerSize );

    if (!dist || !cluster_assignment_cur || !cluster_assignment_prev || !point_move_score)
        fail("Error allocating dist arrays\n");

    // Initial setup. Assignment Step  
    calc_all_distances(dim, n, k, X, cluster_centroid, dist);
    choose_all_clusters_from_distances(dim, n, k, dist, cluster_assignment_cur);
    copy_assignment_array(n, cluster_assignment_cur, cluster_assignment_prev);

    //The initial quality is the one obtained from the random election
    float prev_totD = calc_total_distance(dim, n, k, X, cluster_centroid, cluster_assignment_cur);

    int numVariations = 0;
    // UPDATE STEP
    // for (int batch=0; (batch < iterations) && (numVariations <MAX_ITER_NO_IMPR); ++batch) {

   for (int batch = 0; batch < iterations; ++batch) {
        //printf("Batch step: %d \n", batch);
        //cluster_diag(dim, n, k, X, cluster_assignment_cur, cluster_centroid);

        // update cluster centroids. Update Step
        calc_cluster_centroids(dim, n, k, X, cluster_assignment_cur, cluster_centroid);

        float totD = calc_total_distance(dim, n, k, X, cluster_centroid, cluster_assignment_cur);

        // see if we've failed to improve
        if (totD >= prev_totD){
            // failed to improve - currently solution worse than previous
            // restore old assignments
            copy_assignment_array(n, cluster_assignment_prev, cluster_assignment_cur);

            // recalc centroids
            // calc_cluster_centroids(dim, n, k, X, cluster_assignment_cur, cluster_centroid);    
            //printf("\tNegative progress made on this step - iteration completed (%.2f) \n", prev_totD-totD);
            ++numVariations; //To implement no convergence criteria               
        }
        else { // We have made some improvements        
            // save previous step
            copy_assignment_array(n, cluster_assignment_cur, cluster_assignment_prev);
            // move all points to nearest cluster
            calc_all_distances(dim, n, k, X, cluster_centroid, dist);
            choose_all_clusters_from_distances(dim, n, k, dist, cluster_assignment_cur);
            //check how many assignments are different  
            //int change_count = assignment_change_count(n, cluster_assignment_cur, cluster_assignment_prev);
            //printf("\tIn the batch: %d, has changed: %d element to a different cluster with an improvement of %f \n", batch, change_count, prev_totD-totD);
            //fflush(stdout);
            prev_totD = totD;
        } 
    }

// COMENTAR ESTA LÍNEA PARA NO MOSTRAR RESULTADOS
    cluster_diag(dim, n, k, X, cluster_assignment_cur, cluster_centroid);

    // write to output array
    copy_assignment_array(n, cluster_assignment_cur, cluster_assignment_final);    

    //Free memory
    free(dist);
    free(cluster_assignment_cur);
    free(cluster_assignment_prev);
    free(point_move_score);
}           



/**
* random_init_centroid chooses random prototypes that belong to the dataset. They are points of the dataset.   
*@param float * -: cluster_centro_if: clustes id choosen
*@param float * -: dataSetMatrix 
*@param int clusters: Number of cluster to be don. 
*@param int rows in number of rows in the dataset; i.e. points
*@param int columns: number of columns. Point's dimension. 
*@return void
*/
void random_init_centroid (float * cluster_centro_id, float * dataSetMatrix, int clusters, int rows, int columns) {
   srand(time(NULL));

   for (int i=0; i<clusters; ++i) {
        int r = rand()%rows; 
        for (int j=0; j<columns;++j) {
            cluster_centro_id[i*columns+j]=dataSetMatrix[r*columns+j];
            //printf ("Los indices son  %d\n", r*columns+j);        
        }       
    }
}   



int main( int argc, char *argv[] ) {
/**/
    // COMPROBAR QUE LA CANCELACIÓN ESTÉ ACTIVADA    
    if( !omp_get_cancellation() )
    {
        //printf("Cancellations were not enabled, enabling cancellation and rerunning program\n");
        putenv("OMP_CANCELLATION=true");
        execv(argv[0], argv);
    }
    // COMPROBAR QUE EL PROGRAMA SIEMPRE SE EJECUTE EN PARALELO
    int numHilos = 0;
    #pragma omp parallel
    {
        if ( omp_get_thread_num() == 0 ) numHilos = omp_get_num_threads();
    }
    if (numHilos == 1) {
    //printf("Program is executing sequentially, setting 2 threads and rerunning program\n");
        putenv("OMP_NUM_THREADS=2");
        execv(argv[0], argv);
    }
/**/

    float *cluster_centroid;   // initial cluster centroids. The size is Clusters x rows
    int *clustering_output;  // output
    int rows=0, columns=0, clusters=1;
    int iterations = 1000;
    float * dataSetMatrix=NULL;
    char c, *fileName=NULL;

    //int err=system("clear");

    while ((c = getopt (argc, argv, "v:c:f:i:h")) != -1) {
        switch (c) {
        case 'v':
            printf("K means algorithm v.1.0\n\n");
        return 0;
        case 'c':
            clusters = atoi(optarg);
            if (clusters < 1) { 
                    printf ("the minimum number of clusters is 1\n");
                    return 0;
                }
                break;
        case 'f':
                fileName = (char *) malloc (strlen(optarg)+1);  
            strcpy(fileName,optarg);
            break;
        case 'i':
                iterations = atoi (optarg);  
                break;
        case 'h':
        case '?':
            printf("Usage:\trun -c number of clusters -f fichero.txt -i number of iterations [-h | -? HELP] \n");
        printf("\t<Params>\n");
        printf("\t\t-v\t\tOutput version information and exit\n");
            return 0;
        }
    }

    //printf ("..............Loading data set...............\n "); 
    // Get file size dataset
    getSizeFile( fileName, &rows, &columns );

    clustering_output = (int *) malloc (rows*sizeof(int));
    // Reserve dynamic memory for dataset matrix
    reserveDynamicMemoryForMatrix( &dataSetMatrix, rows, columns );

    // Set data in the dataset matrix
    setDataInMatrix( dataSetMatrix, fileName, rows, columns );

    //printf ("-------DataSet: \n");
    //printMatrix(dataSetMatrix, rows, columns);

    // printf ("..............Done..............\n "); 
    cluster_centroid = (float *) malloc (clusters*columns*sizeof(float));
    random_init_centroid (cluster_centroid, dataSetMatrix, clusters, rows, columns);   

    //printf (".........Initial Prototypes: ................ \n");
    //printMatrix(cluster_centroid, clusters, columns);

// COMENTAR ESTAS LÍNEA PARA NO MOSTRAR RESULTADOS
    printf ("The number of instance: %d Variables: %d Clusters: %d and Iterations: %d\n", rows, columns,clusters, iterations);
//  printf ("File: %d; \tClusters: %d; \tIterations: %d\n", filename, clusters, iterations);
//    
    double ini = omp_get_wtime();
    kmeans (columns, dataSetMatrix, rows, clusters, cluster_centroid, iterations, clustering_output);  
    double fin = omp_get_wtime();
    printf ("The execution time is %lf seconds\n", fin-ini);

    // Free memory
    free (dataSetMatrix); 
    free (cluster_centroid);
    free (clustering_output); 
}

But now, the problem is that it have carreer conditions due to in sequential its execution time is 9.1XXs and in parallel is 90.XXXs so... the perfomance is decreased. Any idea why could it be?

I have any questions:

Thank you.

Upvotes: 2

Views: 824

Answers (1)

Brice
Brice

Reputation: 1580

Haven't gone through the whole code. You need to parallelize the loops where there is a significant amount of data and in most cases a single level of parallelization is sufficient. e.g. using #omp pragma parallel for in the calc_all_distances is fine. However, the function calc_distance is called from within that loop, so that function should not be parallelized.

float calc_distance(int dim, float *p1, float  *p2) {

    float distance_sq_sum = 0;
    // Serial loop as the function will be called from already parallellized block of code
    for (int i = 0; i < dim; ++i)
        distance_sq_sum += sqr(p1[i] - p2[i]);

    return distance_sq_sum;
}

void calc_all_distances(int dim, int n, int k, float *restrict X, float *restrict centroid, float *restrict distance_output) {
    // Parallelization is done there. 
    #pragma omp parallel for schedule(static) collapse(2)
    for (int i = 0; i < n; ++i) // for each point
        for (int j = 0; j < k; ++j) // for each cluster 
            // calculate distance between point and cluster centroid
            distance_output[i*k+j] = calc_distance(dim, &X[i*dim], &centroid[j*dim]);
}

The schedule(dynamic,1) clause creates significant overhead and best used when the execution time for iterations varies significantly and some threads may have load of calculations to do while the others are done and sit idle. It will cause each thread to request to the master one chunk of data to process after each iteration. Here, the default schedule(static) (the workload is split once for all at the beginning of the loop) seems better since the amount of work is the same regardless of the iteration. schedule(guided) is a compromise. The collapse(2) clause parallelizes both loops together (useful if there are less points than openmp threads, in which cas parallelization over the first loop would not be optimal) but may degrade performance otherwise. The restrict keyword tells the compiler that it can expect all three pointer to point at different memory locations (and that basically writing into distance_output will not change the values of X or centroid) and may optimize the code based on the assumption.

In calc_cluster_centroids : #pragma omp atomic should indeed protect the ++cluster_member_count[active_cluster]; statement. The same is true wherever a increment is done on values shared between all threads. Also, cluster_member_count is allocated some memory that is never freed (mem leak).

Upvotes: 2

Related Questions