Bailey C
Bailey C

Reputation: 78

No speedup in multi-threaded Monte Carlo calculation

I've built a sequential, multi-threaded, and multi-process (MPI) version of a Monte Carlo calculation to compare parallel programming techniques. Comparing the sequential code to the MPI code produces the expected results. For a very large number of samples, the MPI code runs about 5 times faster with 5 processes performing calculations. However, I can't get the multi-threaded version to run faster, even though the system monitor shows multiple cores working on the calculation. I am running the code on Linux. Except for MPI in the multi-process version, I don't use any external libraries.

What would cause the multi-threaded version to take effectively the same amount of time in this case, even though the calculations are uniformly distributed to threads that have been assigned to different cores? I've made everything possible local to the threaded functions to hopefully eliminate false sharing, but I saw no change compared to using global variables for everything.

Sequential version:

#include "common/common.hpp"


// Integral to evaluate
#define v(x)    exp(x)


using namespace std;

int main(int argc, char **argv)
{
    // Limits of integration
    const double a = 0.0, b = 1.0;

    // Read number of samples strata from command-line input
    uint nSamples = atoi(argv[1]);
    uint nStrata = atoi(argv[2]);

    srand((int)time(0));

    // Sums in each stratum
    vector<uint> nSamples_s(nStrata, 0);
    vector<double> sumX_s(nStrata, 0.0), sumX2_s(nStrata, 0.0);

    double x, delta = (b-a)/nStrata;
    uint s;

    double mean, var;

    for (uint i = 1; i <= nSamples; i++) {
        // Sample random variable
        x = a + (b-a)*((double)rand() / RAND_MAX);

        // Select the matching stratum
        s = nStrata*(x-a)/(b-a);
        s = (s == nStrata) ? nStrata - 1 : s;

        // Store sums
        nSamples_s[s]++;
        sumX_s[s] += delta*v(x);
        sumX2_s[s] += pow(delta*v(x), 2.0);
    }

    // Calculate summary statistics
    mean = 0.0;
    var = 0.0;
    for (uint j = 0; j < nStrata; j++) {
        mean += sumX_s[j]/nSamples_s[j];
        var += sumX2_s[j]/nSamples_s[j] - pow(sumX_s[j]/nSamples_s[j], 2.0);
    }

    // Output summary statistics
    cout << "\nIntegral estimate: " << mean
         << "\n\tstddev = " << sqrt(var)
         << "\n\tstderr = " << sqrt(var/nSamples) << endl;

    return 0;
}

Multi-threaded version:

#include "common/common.hpp"
#include <thread>
#include <mutex>


using namespace std;

// Mutex for modifying summary statistics
mutex mtx;

// Integral to evaluate
#define v(x)    exp(x)

double mean = 0.0, var = 0.0;


void partialSum(uint rank, uint numWorkers, double a, double b, uint nStrata, uint nSamples);


int main(int argc, char **argv)
{
    // Limits of integration
    const double a = 0.0, b = 1.0;

    // Read number of samples and strata from command-line input
    uint nSamples = atoi(argv[1]);
    uint nStrata = atoi(argv[2]);

    srand((int)time(0));

    // Worker threads
    const uint numWorkers = 5;
    vector<thread> workers;

    // Start threads
    for (uint t = 0; t < numWorkers; t++)
        workers.push_back(thread(partialSum, t, numWorkers, a, b, nStrata, nSamples));

    // Wait for thread execution
    for (uint t = 0; t < numWorkers; t++)
        workers[t].join();

    // Output summary statistics
    cout << "\nIntegral estimate: " << mean
         << "\n\tstddev = " << sqrt(var)
         << "\n\tstderr = " << sqrt(var/nSamples) << endl;

    return 0;
}

void partialSum(uint rank, uint numWorkers, double a, double b, uint nStrata, uint nSamples)
{
    uint nStrata_t, nSamples_t;     // Actual number of strata and samples handled by this thread
    uint stdStrata_t;               // Nominal number of strata per thread

    nStrata_t = stdStrata_t = nStrata / numWorkers;
    if (rank == numWorkers - 1)
        nStrata_t += nStrata % numWorkers;

    uint strataOffset = rank * stdStrata_t;

    nSamples_t = stdStrata_t * (nSamples / nStrata);
    if (rank == numWorkers - 1)
        nSamples_t += nSamples % nStrata;

    // Summed statistics for each stratum in this thread
    vector<uint> nSamples_st(nStrata_t, 0);
    vector<double> sumX_st(nStrata_t, 0.0), sumX2_st(nStrata_t, 0.0);

    // Width of integration region for each stratum and for this thread
    double delta_s = (b-a)/nStrata;
    double delta_t = delta_s * nStrata_t;

    double x;   // Sampling variable
    uint s;     // Corresponding stratum

    // Sum statistics
    for (uint i = 0; i < nSamples_t; i++) {
        // Sample random variable
        x = delta_t*((double)rand() / RAND_MAX);

        // Select the matching stratum
        s = nStrata_t*x/delta_t;
        s = (s == nStrata_t) ? nStrata_t - 1 : s;

        // Store sums
        nSamples_st[s]++;
        sumX_st[s] += delta_s*v(x + a + strataOffset*delta_s);
        sumX2_st[s] += pow(delta_s*v(x + a + strataOffset*delta_s), 2.0);
    }

    // Calculate summary statistics
    double partialMean = 0.0, partialVar = 0.0;
    for (uint j = 0; j < nStrata_t; j++) {
        partialMean += sumX_st[j]/nSamples_st[j];
        partialVar += sumX2_st[j]/nSamples_st[j] - pow(sumX_st[j]/nSamples_st[j], 2.0);
    }

    // Lock mutex until thread exit
    lock_guard<mutex> lockStats(mtx);

    // Add contributions from this thread to summary statistics
    mean += partialMean;
    var += partialVar;
}

MPI version:

#include "common/common.hpp"
#include <mpi.h>


// Limits of integration
const double a = 0.0, b = 1.0;

// Number of samples and strata
uint nSamples, nStrata;

// MPI process data
int numProcs, numWorkers, procRank;

// Integral to evaluate
#define v(x)    exp(x)


#define MPI_TAG_MEAN    0
#define MPI_TAG_VAR     1

void partialSum();
void collectSums();


using namespace std;

int main(int argc, char **argv)
{
    // MPI setup
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
    MPI_Comm_rank(MPI_COMM_WORLD, &procRank);

    // Number of slave processes
    numWorkers = numProcs - 1;
    assert(numWorkers > 0);

    // Read number of samples and strata from command-line input
    nSamples = atoi(argv[1]);
    nStrata = atoi(argv[2]);

    srand((int)time(0));

    if (!procRank) {    // Process 0
        collectSums();
    } else {            // Worker processes
        partialSum();
    }

    MPI_Finalize();
    return 0;
}

void partialSum()
{
    int stdStrata_p, stdSamples_p;  // Nominal number of strata and samples per process
    int nStrata_p, nSamples_p;      // Actual number of strata and samples handled by this process

    nStrata_p = stdStrata_p = nStrata / numWorkers;
    if (procRank == numWorkers)
        nStrata_p += nStrata % numWorkers;

    int strataOffset = (procRank - 1) * stdStrata_p;

    nSamples_p = stdSamples_p = nStrata_p * (nSamples / nStrata);
    if (procRank == numWorkers)
        nSamples_p += nSamples % nStrata;

    // Sums in each stratum handled by this process
    vector<uint> nSamples_sp(nStrata_p, 0);
    vector<double> sumX_sp(nStrata_p, 0.0), sumX2_sp(nStrata_p, 0.0);

    // Width of integration region for each stratum and this process
    double delta_s = (b-a)/nStrata;
    double delta_p = delta_s*nStrata_p;

    double x;   // Sampling variable
    uint s;     // Corresponding stratum

    // Summed statistics
    double mean, var;

    for (int i = 0; i < nSamples_p; i++) {
        // Sample random variable
        x = delta_p*((double)rand() / RAND_MAX);

        // Select the matching stratum
        s = nStrata_p*x/delta_p;
        s = (s == nStrata_p) ? nStrata_p - 1 : s;

        // Store sums
        nSamples_sp[s]++;
        sumX_sp[s] += delta_s*v(x + a + strataOffset*delta_s);
        sumX2_sp[s] += pow(delta_s*v(x + a + strataOffset*delta_s), 2.0);
    }

    mean = 0.0;
    var = 0.0;
    for (int j = 0; j < nStrata_p; j++) {
        mean += sumX_sp[j]/nSamples_sp[j];
        var += sumX2_sp[j]/nSamples_sp[j] - pow(sumX_sp[j]/nSamples_sp[j], 2.0);
    }

    MPI_Send(&mean, 1, MPI_DOUBLE, 0, MPI_TAG_MEAN, MPI_COMM_WORLD);
    MPI_Send(&var, 1, MPI_DOUBLE, 0, MPI_TAG_VAR, MPI_COMM_WORLD);
}

void collectSums()
{
    double mean = 0.0, var = 0.0;

    for (int i = 0; i < 2*numWorkers; i++) {
        double readBuf;
        MPI_Status readStatus;

        MPI_Recv(&readBuf, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &readStatus);

        if (readStatus.MPI_TAG == MPI_TAG_MEAN)
            mean += readBuf;
        else if (readStatus.MPI_TAG == MPI_TAG_VAR)
            var += readBuf;
    }

    // Output summary statistics
    cout << "\nIntegral estimate: " << mean
         << "\n\tstddev = " << sqrt(var)
         << "\n\tstderr = " << sqrt(var/nSamples) << endl;
}

The programs were compiled and run as such:

$ g++ strat_samples.cpp -o strat_samples -std=gnu++11 -O2 -Wall
$ time ./strat_samples 100000000 100

Integral estimate: 1.71828
    stddev = 0.000515958
    stderr = 5.15958e-08

real    0m18.709s
user    0m18.704s
sys     0m0.000s


$ g++ strat_samples_thd.cpp -o strat_samples_thd -std=gnu++11 -lpthread -O2 -Wall
$ time ./strat_samples_thd 100000000 100
Integral estimate: 1.71828
    stddev = 0.000515951
    stderr = 5.15951e-08

real    0m18.981s
user    0m39.608s
sys     0m44.588s


$ mpic++ strat_samples_mpi.cpp -o strat_samples_mpi -std=gnu++11 -O2 -Wall
$ time mpirun -n 6 ./strat_samples_mpi 100000000 100

Integral estimate: 1.71828
    stddev = 0.000515943
    stderr = 5.15943e-08

real    0m7.601s
user    0m32.912s
sys     0m5.696s

Note: the speed-up of the MPI version is even more significant when you start adding 0's to the command-line input.

Upvotes: 1

Views: 619

Answers (1)

Daniel Langr
Daniel Langr

Reputation: 23497

Every pseudo random number generator (PRNG) has a state. In case of rand its hidden, however, its usage in multithreaded codes results in data races and therefore undefined behavior. Moreover, rand has other significant drawbacks as well.

If C++11 is available to you, use its random library part, use one PRNG per thread, use proper distribution, and be careful not to seed PRNGs with same values.

Upvotes: 1

Related Questions