Reputation: 78
I've built a sequential, multi-threaded, and multi-process (MPI) version of a Monte Carlo calculation to compare parallel programming techniques. Comparing the sequential code to the MPI code produces the expected results. For a very large number of samples, the MPI code runs about 5 times faster with 5 processes performing calculations. However, I can't get the multi-threaded version to run faster, even though the system monitor shows multiple cores working on the calculation. I am running the code on Linux. Except for MPI in the multi-process version, I don't use any external libraries.
What would cause the multi-threaded version to take effectively the same amount of time in this case, even though the calculations are uniformly distributed to threads that have been assigned to different cores? I've made everything possible local to the threaded functions to hopefully eliminate false sharing, but I saw no change compared to using global variables for everything.
Sequential version:
#include "common/common.hpp"
// Integral to evaluate
#define v(x) exp(x)
using namespace std;
int main(int argc, char **argv)
{
// Limits of integration
const double a = 0.0, b = 1.0;
// Read number of samples strata from command-line input
uint nSamples = atoi(argv[1]);
uint nStrata = atoi(argv[2]);
srand((int)time(0));
// Sums in each stratum
vector<uint> nSamples_s(nStrata, 0);
vector<double> sumX_s(nStrata, 0.0), sumX2_s(nStrata, 0.0);
double x, delta = (b-a)/nStrata;
uint s;
double mean, var;
for (uint i = 1; i <= nSamples; i++) {
// Sample random variable
x = a + (b-a)*((double)rand() / RAND_MAX);
// Select the matching stratum
s = nStrata*(x-a)/(b-a);
s = (s == nStrata) ? nStrata - 1 : s;
// Store sums
nSamples_s[s]++;
sumX_s[s] += delta*v(x);
sumX2_s[s] += pow(delta*v(x), 2.0);
}
// Calculate summary statistics
mean = 0.0;
var = 0.0;
for (uint j = 0; j < nStrata; j++) {
mean += sumX_s[j]/nSamples_s[j];
var += sumX2_s[j]/nSamples_s[j] - pow(sumX_s[j]/nSamples_s[j], 2.0);
}
// Output summary statistics
cout << "\nIntegral estimate: " << mean
<< "\n\tstddev = " << sqrt(var)
<< "\n\tstderr = " << sqrt(var/nSamples) << endl;
return 0;
}
Multi-threaded version:
#include "common/common.hpp"
#include <thread>
#include <mutex>
using namespace std;
// Mutex for modifying summary statistics
mutex mtx;
// Integral to evaluate
#define v(x) exp(x)
double mean = 0.0, var = 0.0;
void partialSum(uint rank, uint numWorkers, double a, double b, uint nStrata, uint nSamples);
int main(int argc, char **argv)
{
// Limits of integration
const double a = 0.0, b = 1.0;
// Read number of samples and strata from command-line input
uint nSamples = atoi(argv[1]);
uint nStrata = atoi(argv[2]);
srand((int)time(0));
// Worker threads
const uint numWorkers = 5;
vector<thread> workers;
// Start threads
for (uint t = 0; t < numWorkers; t++)
workers.push_back(thread(partialSum, t, numWorkers, a, b, nStrata, nSamples));
// Wait for thread execution
for (uint t = 0; t < numWorkers; t++)
workers[t].join();
// Output summary statistics
cout << "\nIntegral estimate: " << mean
<< "\n\tstddev = " << sqrt(var)
<< "\n\tstderr = " << sqrt(var/nSamples) << endl;
return 0;
}
void partialSum(uint rank, uint numWorkers, double a, double b, uint nStrata, uint nSamples)
{
uint nStrata_t, nSamples_t; // Actual number of strata and samples handled by this thread
uint stdStrata_t; // Nominal number of strata per thread
nStrata_t = stdStrata_t = nStrata / numWorkers;
if (rank == numWorkers - 1)
nStrata_t += nStrata % numWorkers;
uint strataOffset = rank * stdStrata_t;
nSamples_t = stdStrata_t * (nSamples / nStrata);
if (rank == numWorkers - 1)
nSamples_t += nSamples % nStrata;
// Summed statistics for each stratum in this thread
vector<uint> nSamples_st(nStrata_t, 0);
vector<double> sumX_st(nStrata_t, 0.0), sumX2_st(nStrata_t, 0.0);
// Width of integration region for each stratum and for this thread
double delta_s = (b-a)/nStrata;
double delta_t = delta_s * nStrata_t;
double x; // Sampling variable
uint s; // Corresponding stratum
// Sum statistics
for (uint i = 0; i < nSamples_t; i++) {
// Sample random variable
x = delta_t*((double)rand() / RAND_MAX);
// Select the matching stratum
s = nStrata_t*x/delta_t;
s = (s == nStrata_t) ? nStrata_t - 1 : s;
// Store sums
nSamples_st[s]++;
sumX_st[s] += delta_s*v(x + a + strataOffset*delta_s);
sumX2_st[s] += pow(delta_s*v(x + a + strataOffset*delta_s), 2.0);
}
// Calculate summary statistics
double partialMean = 0.0, partialVar = 0.0;
for (uint j = 0; j < nStrata_t; j++) {
partialMean += sumX_st[j]/nSamples_st[j];
partialVar += sumX2_st[j]/nSamples_st[j] - pow(sumX_st[j]/nSamples_st[j], 2.0);
}
// Lock mutex until thread exit
lock_guard<mutex> lockStats(mtx);
// Add contributions from this thread to summary statistics
mean += partialMean;
var += partialVar;
}
MPI version:
#include "common/common.hpp"
#include <mpi.h>
// Limits of integration
const double a = 0.0, b = 1.0;
// Number of samples and strata
uint nSamples, nStrata;
// MPI process data
int numProcs, numWorkers, procRank;
// Integral to evaluate
#define v(x) exp(x)
#define MPI_TAG_MEAN 0
#define MPI_TAG_VAR 1
void partialSum();
void collectSums();
using namespace std;
int main(int argc, char **argv)
{
// MPI setup
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
MPI_Comm_rank(MPI_COMM_WORLD, &procRank);
// Number of slave processes
numWorkers = numProcs - 1;
assert(numWorkers > 0);
// Read number of samples and strata from command-line input
nSamples = atoi(argv[1]);
nStrata = atoi(argv[2]);
srand((int)time(0));
if (!procRank) { // Process 0
collectSums();
} else { // Worker processes
partialSum();
}
MPI_Finalize();
return 0;
}
void partialSum()
{
int stdStrata_p, stdSamples_p; // Nominal number of strata and samples per process
int nStrata_p, nSamples_p; // Actual number of strata and samples handled by this process
nStrata_p = stdStrata_p = nStrata / numWorkers;
if (procRank == numWorkers)
nStrata_p += nStrata % numWorkers;
int strataOffset = (procRank - 1) * stdStrata_p;
nSamples_p = stdSamples_p = nStrata_p * (nSamples / nStrata);
if (procRank == numWorkers)
nSamples_p += nSamples % nStrata;
// Sums in each stratum handled by this process
vector<uint> nSamples_sp(nStrata_p, 0);
vector<double> sumX_sp(nStrata_p, 0.0), sumX2_sp(nStrata_p, 0.0);
// Width of integration region for each stratum and this process
double delta_s = (b-a)/nStrata;
double delta_p = delta_s*nStrata_p;
double x; // Sampling variable
uint s; // Corresponding stratum
// Summed statistics
double mean, var;
for (int i = 0; i < nSamples_p; i++) {
// Sample random variable
x = delta_p*((double)rand() / RAND_MAX);
// Select the matching stratum
s = nStrata_p*x/delta_p;
s = (s == nStrata_p) ? nStrata_p - 1 : s;
// Store sums
nSamples_sp[s]++;
sumX_sp[s] += delta_s*v(x + a + strataOffset*delta_s);
sumX2_sp[s] += pow(delta_s*v(x + a + strataOffset*delta_s), 2.0);
}
mean = 0.0;
var = 0.0;
for (int j = 0; j < nStrata_p; j++) {
mean += sumX_sp[j]/nSamples_sp[j];
var += sumX2_sp[j]/nSamples_sp[j] - pow(sumX_sp[j]/nSamples_sp[j], 2.0);
}
MPI_Send(&mean, 1, MPI_DOUBLE, 0, MPI_TAG_MEAN, MPI_COMM_WORLD);
MPI_Send(&var, 1, MPI_DOUBLE, 0, MPI_TAG_VAR, MPI_COMM_WORLD);
}
void collectSums()
{
double mean = 0.0, var = 0.0;
for (int i = 0; i < 2*numWorkers; i++) {
double readBuf;
MPI_Status readStatus;
MPI_Recv(&readBuf, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &readStatus);
if (readStatus.MPI_TAG == MPI_TAG_MEAN)
mean += readBuf;
else if (readStatus.MPI_TAG == MPI_TAG_VAR)
var += readBuf;
}
// Output summary statistics
cout << "\nIntegral estimate: " << mean
<< "\n\tstddev = " << sqrt(var)
<< "\n\tstderr = " << sqrt(var/nSamples) << endl;
}
The programs were compiled and run as such:
$ g++ strat_samples.cpp -o strat_samples -std=gnu++11 -O2 -Wall
$ time ./strat_samples 100000000 100
Integral estimate: 1.71828
stddev = 0.000515958
stderr = 5.15958e-08
real 0m18.709s
user 0m18.704s
sys 0m0.000s
$ g++ strat_samples_thd.cpp -o strat_samples_thd -std=gnu++11 -lpthread -O2 -Wall
$ time ./strat_samples_thd 100000000 100
Integral estimate: 1.71828
stddev = 0.000515951
stderr = 5.15951e-08
real 0m18.981s
user 0m39.608s
sys 0m44.588s
$ mpic++ strat_samples_mpi.cpp -o strat_samples_mpi -std=gnu++11 -O2 -Wall
$ time mpirun -n 6 ./strat_samples_mpi 100000000 100
Integral estimate: 1.71828
stddev = 0.000515943
stderr = 5.15943e-08
real 0m7.601s
user 0m32.912s
sys 0m5.696s
Note: the speed-up of the MPI version is even more significant when you start adding 0's to the command-line input.
Upvotes: 1
Views: 619
Reputation: 23497
Every pseudo random number generator (PRNG) has a state. In case of rand
its hidden, however, its usage in multithreaded codes results in data races and therefore undefined behavior. Moreover, rand
has other significant drawbacks as well.
If C++11 is available to you, use its random
library part, use one PRNG per thread, use proper distribution, and be careful not to seed PRNGs with same values.
Upvotes: 1