iNFINITEi
iNFINITEi

Reputation: 1534

Parallel update of matrix columns using OpenMP atomic

I have this small piece of code which basically does simple gathering operations, and I am trying to use OpenMP to make it multi-threaded:

Eigen::MatrixXf methodA(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
  assert(offset.size() == weight.size());
  Eigen::MatrixXf values(in.rows(), 2000);

  #pragma omp parallel for
  for (int j = 0; j < in.cols(); ++j) {

    for (int i = 0; i < offset.rows(); ++i) {
      int o = offset(i, j); // guaranteed to index between 0 - in.cols()
      float w = weight(i, j);

      // Update column o i.e. values.col(o) += w * in.col(j);
      for (int k = 0; k < in.rows(); ++k) {
        #pragma omp atomic
        values(k, o) += w * in(k, j);
      }
    }
  }
  return values;
}

Typical dimension of the matrices are:

in =  10 x N
offset, weight :  6 X N
Here N is very large like 20 million.

Since there is race condition i used atomic operation. However this results in much slower code (2-3 times slower) than the serial code without the openMP #pragmas.

A full code to to run this can found here.

I have no experience in multi threaded programming beyond using simple use parallel for. So what I am doing wrong here?

Also I am okay with non OpenMP solutions like TBB.

Upvotes: 0

Views: 1477

Answers (2)

Rutger
Rutger

Reputation: 121

I do not yet have the reputation to comment yet, but the (excellent!) answer by Avi Ginsburg contains some slight mistakes. Also, the OP most likely has a mistake in the code.

First error: after declaring weight(6, N), the matrix 'in' is set to zero. This should be the weights. Otherwise, all the output matrices will be identically zero. That's why the answer by Avi Ginsburg works.

Second error: in methodD of Avi Ginsberg, the matrices global_values and values are not initialized to zero. That leads to undefined output. Indeed, for me, when using openmp, methodD gave the wrong results. The following code fixes all of that. Compile with:

g++ filename.cpp -o execname -I/path/to/eigen3 -O2 -fopenmp

#include <Eigen/Core>
#include <chrono>
#include <random>
#include <iostream>

Eigen::MatrixXf methodA(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
  assert(offset.size() == weight.size());
  Eigen::MatrixXf values(in.rows(), 2000);

  #pragma omp parallel for
  for (int j = 0; j < in.cols(); ++j) {

    for (int i = 0; i < offset.rows(); ++i) {
      int o = offset(i, j); // guaranteed to index between 0 - in.cols()
      float w = weight(i, j);

      // Update column o i.e. values.col(o) += w * in.col(j);
      for (int k = 0; k < in.rows(); ++k) {
        #pragma omp atomic
        values(k, o) += w * in(k, j);
      }
    }
  }
  return values;
}

Eigen::MatrixXf methodB(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf values(in.rows(), 2000);
    for (int j = 0; j < in.cols(); ++j) {

        for (int i = 0; i < offset.rows(); ++i) {
            int o = offset(i, j);
            float w = weight(i, j);

            // Update column o i.e. values.col(o) += w * in.col(j);
            for (int k = 0; k < in.rows(); ++k)
                values(k, o) += w * in(k, j);
        }
    }
    return values;
}

Eigen::MatrixXf methodC(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf values(in.rows(), 2000);

#pragma omp parallel for
    for (int j = 0; j < in.cols(); ++j) {

        for (int i = 0; i < offset.rows(); ++i) {
            int o = offset(i, j);
            float w = weight(i, j);

            // Update column o i.e. values.col(o) += w * in.col(j);
            for (int k = 0; k < in.rows(); ++k)
#pragma omp atomic
                values(k, o) += w * in(k, j);
        }
    }
    return values;
}
Eigen::MatrixXf methodD(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf global_values = Eigen::MatrixXf::Zero(in.rows(), 2000);

#pragma omp parallel default(none)
    {
        Eigen::MatrixXf values = Eigen::MatrixXf::Zero(in.rows(), 2000);
#pragma omp for
        for (int j = 0; j < in.cols(); ++j) {

            for (int i = 0; i < offset.rows(); ++i) {
                int o = offset(i, j);
                float w = weight(i, j);

                // Update column o i.e. values.col(o) += w * in.col(j);
                for (int k = 0; k < in.rows(); ++k)
                    values(k, o) += w * in(k, j);
            }
        }

#pragma omp critical
        {
            global_values += values;
        }
    }
    return global_values;
}

int main(int argc, char const *argv[]) {
    typedef std::chrono::high_resolution_clock TimerType;

    Eigen::initParallel();

    int N = 960 * 720 * 20;
    float norm;

    Eigen::MatrixXf in(11, N);
    in.setRandom();

    Eigen::MatrixXf weight(6, N);
    weight.setRandom();

    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(0, 1999);

    Eigen::MatrixXf outA, outB, outC, outD;

    Eigen::MatrixXi offset(6, N);
    for (int i = 0; i < offset.size(); i++)
        offset(i) = dis(gen);

    {
        auto tb = TimerType::now() ;
        outA = methodA(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodA) " << 
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    {
        auto tb = TimerType::now() ;
        outB = methodB(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodB) " << 
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    {
        auto tb = TimerType::now();
        outC = methodC(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodC) " <<
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    {
        auto tb = TimerType::now();
        outD = methodD(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodD) " <<
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    norm = outB.cwiseAbs().sum();

    std::cout << "sum|A-B| = " << (outA - outB).cwiseAbs().sum() / norm << std::endl;
    std::cout << "sum|B-C| = " << (outB - outC).cwiseAbs().sum() / norm << std::endl;
    std::cout << "sum|C-D| = " << (outC - outD).cwiseAbs().sum() / norm << std::endl;
    std::cout << "sum|A-C| = " << (outA - outC).cwiseAbs().sum() / norm << std::endl;
    std::cout << "sum|B-D| = " << (outB - outD).cwiseAbs().sum() / norm << std::endl;
    std::cout << "sum|A-D| = " << (outA - outD).cwiseAbs().sum() / norm << std::endl;

    return 0;
}

Upvotes: 3

Avi Ginsburg
Avi Ginsburg

Reputation: 10596

You have one main issue, threads waiting for one another. The atomic operation causes all other thread to wait until the current thread finishes the operation. Of the 2.22 seconds it takes on my computer, more than 2 seconds is spent waiting for the other threads to finish. In this simple case, the simplest way to it to give each thread it's own copy of the matrix to work on and summing them afterwards. I realize that the memory requirements increase dramatically and might not be relevant, but just to prove the point, I've added a methodD to your code and changed the timer to work with chrono:

#include <Eigen/Core>
#include <chrono>
#include <random>
#include <iostream>


Eigen::MatrixXf methodB(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf values(in.rows(), 2000);
    for (int j = 0; j < in.cols(); ++j) {

        for (int i = 0; i < offset.rows(); ++i) {
            int o = offset(i, j);
            float w = weight(i, j);

            // Update column o i.e. values.col(o) += w * in.col(j);
            for (int k = 0; k < in.rows(); ++k)
                values(k, o) += w * in(k, j);
        }
    }
    return values;
}

Eigen::MatrixXf methodC(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf values(in.rows(), 2000);

#pragma omp parallel for
    for (int j = 0; j < in.cols(); ++j) {

        for (int i = 0; i < offset.rows(); ++i) {
            int o = offset(i, j);
            float w = weight(i, j);

            // Update column o i.e. values.col(o) += w * in.col(j);
            for (int k = 0; k < in.rows(); ++k)
#pragma omp atomic
                values(k, o) += w * in(k, j);
        }
    }
    return values;
}
Eigen::MatrixXf methodD(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf global_values(in.rows(), 2000);

#pragma omp parallel
    {
        Eigen::MatrixXf values(in.rows(), 2000);
#pragma omp for
        for (int j = 0; j < in.cols(); ++j) {

            for (int i = 0; i < offset.rows(); ++i) {
                int o = offset(i, j);
                float w = weight(i, j);

                // Update column o i.e. values.col(o) += w * in.col(j);
                for (int k = 0; k < in.rows(); ++k)
                    values(k, o) += w * in(k, j);
            }
        }

#pragma omp critical
        {
            global_values += values;
        }
    }
    return global_values;
}

int main(int argc, char const *argv[]) {
    typedef std::chrono::high_resolution_clock TimerType;

    Eigen::initParallel();

    int N = 960 * 720 * 20;

    Eigen::MatrixXf in(11, N);
    in.setRandom();

    Eigen::MatrixXf weight(6, N);
    in.setRandom();

    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(0, 1999);

    Eigen::MatrixXf outB, outC, outD;

    Eigen::MatrixXi offset(6, N);
    for (int i = 0; i < offset.size(); i++)
        offset(i) = dis(gen);


    {
        auto tb = TimerType::now() ;
        outB = methodB(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodB) " << 
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    {
        auto tb = TimerType::now();
        outC = methodC(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodC) " <<
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    {
        auto tb = TimerType::now();
        outD = methodD(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodD) " <<
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    std::cout << "sum|B-C| = " << (outB - outC).cwiseAbs().sum() << std::endl;
    std::cout << "sum|B-D| = " << (outB - outD).cwiseAbs().sum() << std::endl;

    return 0;
}

This gives me:

Total Time (methodB) 2006 milliseconds.
Total Time (methodC) 3469 milliseconds.
Total Time (methodD) 366 milliseconds.
sum|B-C| = 0
sum|B-D| = 1.10737e-037

Upvotes: 3

Related Questions