Reputation: 1534
I have this small piece of code which basically does simple gathering operations, and I am trying to use OpenMP to make it multi-threaded:
Eigen::MatrixXf methodA(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
assert(offset.size() == weight.size());
Eigen::MatrixXf values(in.rows(), 2000);
#pragma omp parallel for
for (int j = 0; j < in.cols(); ++j) {
for (int i = 0; i < offset.rows(); ++i) {
int o = offset(i, j); // guaranteed to index between 0 - in.cols()
float w = weight(i, j);
// Update column o i.e. values.col(o) += w * in.col(j);
for (int k = 0; k < in.rows(); ++k) {
#pragma omp atomic
values(k, o) += w * in(k, j);
}
}
}
return values;
}
Typical dimension of the matrices are:
in = 10 x N
offset, weight : 6 X N
Here N is very large like 20 million.
Since there is race condition i used atomic operation. However this results in much slower code (2-3 times slower) than the serial code without the openMP #pragmas.
A full code to to run this can found here.
I have no experience in multi threaded programming beyond using simple use parallel for. So what I am doing wrong here?
Also I am okay with non OpenMP solutions like TBB.
Upvotes: 0
Views: 1477
Reputation: 121
I do not yet have the reputation to comment yet, but the (excellent!) answer by Avi Ginsburg contains some slight mistakes. Also, the OP most likely has a mistake in the code.
First error: after declaring weight(6, N), the matrix 'in' is set to zero. This should be the weights. Otherwise, all the output matrices will be identically zero. That's why the answer by Avi Ginsburg works.
Second error: in methodD of Avi Ginsberg, the matrices global_values and values are not initialized to zero. That leads to undefined output. Indeed, for me, when using openmp, methodD gave the wrong results. The following code fixes all of that. Compile with:
g++ filename.cpp -o execname -I/path/to/eigen3 -O2 -fopenmp
#include <Eigen/Core>
#include <chrono>
#include <random>
#include <iostream>
Eigen::MatrixXf methodA(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
assert(offset.size() == weight.size());
Eigen::MatrixXf values(in.rows(), 2000);
#pragma omp parallel for
for (int j = 0; j < in.cols(); ++j) {
for (int i = 0; i < offset.rows(); ++i) {
int o = offset(i, j); // guaranteed to index between 0 - in.cols()
float w = weight(i, j);
// Update column o i.e. values.col(o) += w * in.col(j);
for (int k = 0; k < in.rows(); ++k) {
#pragma omp atomic
values(k, o) += w * in(k, j);
}
}
}
return values;
}
Eigen::MatrixXf methodB(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
assert(offset.size() == weight.size());
Eigen::MatrixXf values(in.rows(), 2000);
for (int j = 0; j < in.cols(); ++j) {
for (int i = 0; i < offset.rows(); ++i) {
int o = offset(i, j);
float w = weight(i, j);
// Update column o i.e. values.col(o) += w * in.col(j);
for (int k = 0; k < in.rows(); ++k)
values(k, o) += w * in(k, j);
}
}
return values;
}
Eigen::MatrixXf methodC(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
assert(offset.size() == weight.size());
Eigen::MatrixXf values(in.rows(), 2000);
#pragma omp parallel for
for (int j = 0; j < in.cols(); ++j) {
for (int i = 0; i < offset.rows(); ++i) {
int o = offset(i, j);
float w = weight(i, j);
// Update column o i.e. values.col(o) += w * in.col(j);
for (int k = 0; k < in.rows(); ++k)
#pragma omp atomic
values(k, o) += w * in(k, j);
}
}
return values;
}
Eigen::MatrixXf methodD(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
assert(offset.size() == weight.size());
Eigen::MatrixXf global_values = Eigen::MatrixXf::Zero(in.rows(), 2000);
#pragma omp parallel default(none)
{
Eigen::MatrixXf values = Eigen::MatrixXf::Zero(in.rows(), 2000);
#pragma omp for
for (int j = 0; j < in.cols(); ++j) {
for (int i = 0; i < offset.rows(); ++i) {
int o = offset(i, j);
float w = weight(i, j);
// Update column o i.e. values.col(o) += w * in.col(j);
for (int k = 0; k < in.rows(); ++k)
values(k, o) += w * in(k, j);
}
}
#pragma omp critical
{
global_values += values;
}
}
return global_values;
}
int main(int argc, char const *argv[]) {
typedef std::chrono::high_resolution_clock TimerType;
Eigen::initParallel();
int N = 960 * 720 * 20;
float norm;
Eigen::MatrixXf in(11, N);
in.setRandom();
Eigen::MatrixXf weight(6, N);
weight.setRandom();
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 1999);
Eigen::MatrixXf outA, outB, outC, outD;
Eigen::MatrixXi offset(6, N);
for (int i = 0; i < offset.size(); i++)
offset(i) = dis(gen);
{
auto tb = TimerType::now() ;
outA = methodA(in, offset, weight);
auto te = TimerType::now();
std::cout << "Total Time (methodA) " <<
std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
<< " milliseconds.\n";
}
{
auto tb = TimerType::now() ;
outB = methodB(in, offset, weight);
auto te = TimerType::now();
std::cout << "Total Time (methodB) " <<
std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
<< " milliseconds.\n";
}
{
auto tb = TimerType::now();
outC = methodC(in, offset, weight);
auto te = TimerType::now();
std::cout << "Total Time (methodC) " <<
std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
<< " milliseconds.\n";
}
{
auto tb = TimerType::now();
outD = methodD(in, offset, weight);
auto te = TimerType::now();
std::cout << "Total Time (methodD) " <<
std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
<< " milliseconds.\n";
}
norm = outB.cwiseAbs().sum();
std::cout << "sum|A-B| = " << (outA - outB).cwiseAbs().sum() / norm << std::endl;
std::cout << "sum|B-C| = " << (outB - outC).cwiseAbs().sum() / norm << std::endl;
std::cout << "sum|C-D| = " << (outC - outD).cwiseAbs().sum() / norm << std::endl;
std::cout << "sum|A-C| = " << (outA - outC).cwiseAbs().sum() / norm << std::endl;
std::cout << "sum|B-D| = " << (outB - outD).cwiseAbs().sum() / norm << std::endl;
std::cout << "sum|A-D| = " << (outA - outD).cwiseAbs().sum() / norm << std::endl;
return 0;
}
Upvotes: 3
Reputation: 10596
You have one main issue, threads waiting for one another. The atomic operation causes all other thread to wait until the current thread finishes the operation. Of the 2.22 seconds it takes on my computer, more than 2 seconds is spent waiting for the other threads to finish. In this simple case, the simplest way to it to give each thread it's own copy of the matrix to work on and summing them afterwards. I realize that the memory requirements increase dramatically and might not be relevant, but just to prove the point, I've added a methodD
to your code and changed the timer to work with chrono
:
#include <Eigen/Core>
#include <chrono>
#include <random>
#include <iostream>
Eigen::MatrixXf methodB(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
assert(offset.size() == weight.size());
Eigen::MatrixXf values(in.rows(), 2000);
for (int j = 0; j < in.cols(); ++j) {
for (int i = 0; i < offset.rows(); ++i) {
int o = offset(i, j);
float w = weight(i, j);
// Update column o i.e. values.col(o) += w * in.col(j);
for (int k = 0; k < in.rows(); ++k)
values(k, o) += w * in(k, j);
}
}
return values;
}
Eigen::MatrixXf methodC(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
assert(offset.size() == weight.size());
Eigen::MatrixXf values(in.rows(), 2000);
#pragma omp parallel for
for (int j = 0; j < in.cols(); ++j) {
for (int i = 0; i < offset.rows(); ++i) {
int o = offset(i, j);
float w = weight(i, j);
// Update column o i.e. values.col(o) += w * in.col(j);
for (int k = 0; k < in.rows(); ++k)
#pragma omp atomic
values(k, o) += w * in(k, j);
}
}
return values;
}
Eigen::MatrixXf methodD(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
assert(offset.size() == weight.size());
Eigen::MatrixXf global_values(in.rows(), 2000);
#pragma omp parallel
{
Eigen::MatrixXf values(in.rows(), 2000);
#pragma omp for
for (int j = 0; j < in.cols(); ++j) {
for (int i = 0; i < offset.rows(); ++i) {
int o = offset(i, j);
float w = weight(i, j);
// Update column o i.e. values.col(o) += w * in.col(j);
for (int k = 0; k < in.rows(); ++k)
values(k, o) += w * in(k, j);
}
}
#pragma omp critical
{
global_values += values;
}
}
return global_values;
}
int main(int argc, char const *argv[]) {
typedef std::chrono::high_resolution_clock TimerType;
Eigen::initParallel();
int N = 960 * 720 * 20;
Eigen::MatrixXf in(11, N);
in.setRandom();
Eigen::MatrixXf weight(6, N);
in.setRandom();
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 1999);
Eigen::MatrixXf outB, outC, outD;
Eigen::MatrixXi offset(6, N);
for (int i = 0; i < offset.size(); i++)
offset(i) = dis(gen);
{
auto tb = TimerType::now() ;
outB = methodB(in, offset, weight);
auto te = TimerType::now();
std::cout << "Total Time (methodB) " <<
std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
<< " milliseconds.\n";
}
{
auto tb = TimerType::now();
outC = methodC(in, offset, weight);
auto te = TimerType::now();
std::cout << "Total Time (methodC) " <<
std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
<< " milliseconds.\n";
}
{
auto tb = TimerType::now();
outD = methodD(in, offset, weight);
auto te = TimerType::now();
std::cout << "Total Time (methodD) " <<
std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
<< " milliseconds.\n";
}
std::cout << "sum|B-C| = " << (outB - outC).cwiseAbs().sum() << std::endl;
std::cout << "sum|B-D| = " << (outB - outD).cwiseAbs().sum() << std::endl;
return 0;
}
This gives me:
Total Time (methodB) 2006 milliseconds.
Total Time (methodC) 3469 milliseconds.
Total Time (methodD) 366 milliseconds.
sum|B-C| = 0
sum|B-D| = 1.10737e-037
Upvotes: 3