Reputation: 156
This question is a follow up on questions Expression templates cppcon 2019 and matrix class with expression templates à la cppcon 2019
I finally coded a toy matrix class dense_matrix
with expression templates as I initially wanted :
// I forward declare the class which all matrix classes will derive from
// as I need such a class matrix-related traits
class base_matrix;
//is_matrix_v
template <class T>
struct is_matrix
{
static constexpr bool value = std::derived_from<T, base_matrix>;
};
template <class T>
struct is_matrix<std::vector<T>>
{
// std::vector<T> will be considered as a column or row matrix according to its position in a multiplication
static constexpr bool value = true;
};
template <class T>
constexpr bool is_matrix_v = is_matrix<std::remove_cvref_t<T>>::value;
struct expression {}; // you always need a base for traits right ?
// finally defining what is being a matrix or an expression
template <class T>
concept is_matrix_or_expression =
is_matrix_v<std::remove_cvref_t<T>>
||
std::is_base_of_v<expression, std::remove_cvref_t<T>>;
// defining what a legal binary matrix operation is
template <class A, class B>
constexpr bool is_matrix_binary_op_ok =
is_matrix_or_expression<A>
||
is_matrix_or_expression<B>;
//subscripts() for matrix(i,j)
template <class operand>
auto subscripts(operand const& v, size_t const i, size_t const j) {
if constexpr (is_matrix_or_expression<operand>) {
return v(i, j);
}
else {
return v;
}
}
template <class operand>
auto rows(operand const& m) {
if constexpr (is_matrix_or_expression<operand>) {
return m.rows();
}
else {
return 1;
}
};
template <class operand>
auto cols(operand const& m) {
if constexpr (is_matrix_or_expression<operand>) {
return m.cols();
}
else {
return 1;
}
};
The matrix template expression class, in the fashion of Bowie Owens' cppcon 2019 talk is using variadic operands to handle any-arity operator all at once :
template <class callable, class... operands>
class matrix_expression : public expression
{
std::tuple<operands const &...> args_;
callable f_;
public:
matrix_expression(callable f, operands const&... args) : args_(args...), f_(f) {}
auto& operator()(const size_t i, const size_t j)
{
auto const call_at_index_couple =
[this, i, j](operands const&... a)
{
return f_(subscripts(a, i, j)...);
};
return std::apply(call_at_index_couple, args_);
}
auto operator()(const size_t i, const size_t j) const
{
auto const call_at_index_couple =
[this, i, j](operands const&... a)
{
return f_(subscripts(a, i, j)...);
};
return std::apply(call_at_index_couple, args_);
}
size_t rows() const
{
auto const call_rows =
[](operands const&... a)
{
return std::max(::rows(a)...);
};
return std::apply(call_rows, args_);
}
size_t cols() const
{
auto const call_cols =
[](operands const&... a)
{
return std::max(::cols(a)...);
};
return std::apply(call_cols, args_);
}
};
//defining addition
template <class LHS, class RHS, class = std::enable_if_t<is_matrix_binary_op_ok<LHS, RHS>>>
auto operator+(LHS const& lhs, RHS const& rhs)
{
return matrix_expression
{
//[](auto a, auto b)
//{
// return a + b;
//}
std::plus<>{},
lhs,
rhs
};
}
// and substraction
template <class LHS, class RHS, class = std::enable_if_t<is_matrix_binary_op_ok<LHS, RHS>>>
auto operator-(LHS const& lhs, RHS const& rhs)
{
return matrix_expression
{
//[](auto a, auto b)
//{
// return a - b;
//}
std::minus<>{},
lhs,
rhs
};
}
//finally defining the forward declared base_matrix
class base_matrix : public expression
{
public:
virtual ~base_matrix() = default;
// So we can call matrix(i,j) ...
virtual double& operator()(const size_t i, const size_t j) = 0;
// ... and set matrix(i,j)
virtual double operator()(const size_t i, const size_t j) const = 0;
virtual size_t rows() const = 0;
virtual size_t cols() const = 0;
};
//defining a simple and concrecte dense matrix class
class dense_matrix : public base_matrix
{
size_t rows_;
size_t columns_;
std::vector<double> underlying_vector_;
public:
dense_matrix() : rows_(0), columns_(0) {}
dense_matrix(const size_t rows, const size_t cols)
: rows_(rows), columns_(cols), underlying_vector_(rows* cols) {}
dense_matrix(const size_t rows, const size_t cols, double* ptr)
: rows_(rows), columns_(cols), underlying_vector_(ptr, ptr + (rows * cols)) {}
dense_matrix(std::vector<double>& vector, bool isColumn = true)
{
if (isColumn)
{
rows_ = vector.size();
columns_ = 1;
}
else
{
rows_ = 1;
columns_ = vector.size();
}
underlying_vector_ = std::move(vector);
}
template <class src_type>
dense_matrix(const src_type& src)
: rows_(src.rows()),
columns_(src.cols()),
underlying_vector_(src.rows()* src.cols())
{
for (size_t i = 0; i < src.rows(); ++i)
{
for (size_t j = 0; j < src.cols(); ++j)
{
underlying_vector_[i * columns_ + j] = src(i, j);
}
}
}
template <class src_type>
dense_matrix& operator=(src_type const& src)
{
for (size_t i = 0; i < rows(); ++i)
{
for (size_t j = 0; j < cols(); ++j)
{
underlying_vector_[i * columns_ + j] = src(i, j);
}
}
return *this; // this line was missing in the slides and in the talk
}
// Access
size_t rows() const override { return rows_; }
size_t cols() const override { return columns_; }
double& operator()(const size_t i, const size_t j) override
{
return underlying_vector_[i * columns_ + j];
}
double operator()(const size_t i, const size_t j) const override
{
return underlying_vector_[i * columns_ + j];
}
};
I am ok for the short time being with this class, so that I wanted to test its performance for algebraic operations. Hence I coded a naive matrix class as follows :
class dense_matrix_naive
{
size_t rows_;
size_t columns_;
std::vector<double> underlying_vector_;
public:
const std::vector<double>& underlying_vector() const { return underlying_vector_; }
dense_matrix_naive() : rows_(0), columns_(0) {}
dense_matrix_naive(const size_t rows, const size_t cols)
: rows_(rows), columns_(cols), underlying_vector_(rows* cols) {}
dense_matrix_naive(const size_t rows, const size_t cols, double* ptr)
: rows_(rows), columns_(cols), underlying_vector_(ptr, ptr + (rows * cols)) {}
dense_matrix_naive(std::vector<double>& vector, bool isColumn = true)
{
if (isColumn)
{
rows_ = vector.size();
columns_ = 1;
}
else
{
rows_ = 1;
columns_ = vector.size();
}
underlying_vector_ = std::move(vector);
}
template <class src_type>
dense_matrix_naive(const src_type& src)
: rows_(src.rows()),
columns_(src.cols()),
underlying_vector_(src.rows()* src.cols())
{
for (size_t i = 0; i < src.rows(); ++i)
{
for (size_t j = 0; j < src.cols(); ++j)
{
underlying_vector_[i * columns_ + j] = src(i, j);
}
}
}
template <class src_type>
dense_matrix_naive& operator=(src_type const& src)
{
for (size_t i = 0; i < rows(); ++i)
{
for (size_t j = 0; j < cols(); ++j)
{
underlying_vector_[i * columns_ + j] = src(i, j);
}
}
return *this; // this line was missing in the slides and in the talk
}
// Access
size_t rows() const { return rows_; }
size_t cols() const { return columns_; }
double& operator()(const size_t i, const size_t j)
{
return underlying_vector_[i * columns_ + j];
}
double operator()(const size_t i, const size_t j) const
{
return underlying_vector_[i * columns_ + j];
}
dense_matrix_naive& operator+=(const dense_matrix_naive& rhs)
{
for (size_t i = 0; i < underlying_vector_.size(); ++i)
{
underlying_vector_[i] += rhs.underlying_vector()[i];
}
return *this;
}
dense_matrix_naive& operator-=(const dense_matrix_naive& rhs)
{
for (size_t i = 0; i < underlying_vector_.size(); ++i)
{
underlying_vector_[i] -= rhs.underlying_vector()[i];
}
return *this;
}
};
inline dense_matrix_naive operator+(const dense_matrix_naive& m1, const dense_matrix_naive& m2)
{
auto res(m1);
res += m2;
return res;
}
inline dense_matrix_naive operator-(const dense_matrix_naive& m1, const dense_matrix_naive& m2)
{
auto res(m1);
res -= m2;
return res;
}
Finally, the comparison test :
double
smain()
function beneath/permissive- /ifcOutput "x64\Release\" /GS /GL /W3 /Gy /Zc:wchar_t /Zi /Gm- /O2 /sdl /Fd"x64\Release\vc143.pdb" /Zc:inline /D "NDEBUG" /D "_CONSOLE" /D "_UNICODE" /D "UNICODE" /errorReport:prompt /WX- /Zc:forScope /Gd /Oi /MD /std:c++latest /FC /Fa"x64\Release\" /EHsc /nologo /Fo"x64\Release\" /Ot /Fp"x64\Release\ConsoleApplication1.pch" /diagnostics:column
The main()
source code is :
int main()
{
constexpr size_t nb_ops = 1000;
constexpr size_t square_matrix_sizes[] = { 10, 20, 30, 40, 50, 60, 70, 80, 90,
100, 200, 300, 400, 500, 600, 700, 800, 900,
1000, 5000};
std::cout << "SIZE" << '\t' << '\t' << "NAIVE" << '\t' << '\t' << "EXPR" << "\n";
for (const auto square_matrix_size : square_matrix_sizes)
{
// Setting data
//constexpr size_t square_matrix_size = 800;
typedef std::mt19937 MyRNG;
uint32_t seed_val = 1729;
MyRNG rng;
rng.seed(seed_val);
std::normal_distribution<double> normal_dist(0.0, 1.0);
dense_matrix_naive m1_naive(square_matrix_size, square_matrix_size);
for (size_t i = 0; i < square_matrix_size; ++i)
{
for (size_t j = 0; j < square_matrix_size; ++j)
{
m1_naive(i, j) = normal_dist(rng);
}
}
dense_matrix_naive m2_naive(square_matrix_size, square_matrix_size);
for (size_t i = 0; i < square_matrix_size; ++i)
{
for (size_t j = 0; j < square_matrix_size; ++j)
{
m2_naive(i, j) = normal_dist(rng);
}
}
dense_matrix_naive m3_naive(square_matrix_size, square_matrix_size);
for (size_t i = 0; i < square_matrix_size; ++i)
{
for (size_t j = 0; j < square_matrix_size; ++j)
{
m3_naive(i, j) = normal_dist(rng);
}
}
dense_matrix m1(square_matrix_size, square_matrix_size);
for (size_t i = 0; i < square_matrix_size; ++i)
{
for (size_t j = 0; j < square_matrix_size; ++j)
{
m1(i, j) = normal_dist(rng);
}
}
dense_matrix m2(square_matrix_size, square_matrix_size);
for (size_t i = 0; i < square_matrix_size; ++i)
{
for (size_t j = 0; j < square_matrix_size; ++j)
{
m2(i, j) = normal_dist(rng);
}
}
dense_matrix m3(square_matrix_size, square_matrix_size);
for (size_t i = 0; i < square_matrix_size; ++i)
{
for (size_t j = 0; j < square_matrix_size; ++j)
{
m3(i, j) = normal_dist(rng);
}
}
auto start = std::chrono::steady_clock::now();
for (size_t i = 0; i < nb_ops; ++i)
{
dense_matrix_naive m4_naive = m1_naive + m2_naive + m3_naive;
}
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed_seconds = end - start;
std::cout << square_matrix_size << '\t' << '\t' << std::fixed << elapsed_seconds << '\t';
start = std::chrono::steady_clock::now();
for (size_t i = 0; i < nb_ops; ++i)
{
dense_matrix m4 = m1 + m2 + m3;
}
end = std::chrono::steady_clock::now();
elapsed_seconds = end - start;
std::cout << std::fixed << elapsed_seconds << "\n";
}
This main() prints something in this fashion :
SIZE NAIVE EXPR
10 0.001863s 0.000990s
20 0.001125s 0.003765s
30 0.002650s 0.010777s
40 0.002999s 0.022661s
50 0.005942s 0.029157s
60 0.008142s 0.030399s
70 0.008975s 0.051088s
80 0.085463s 0.065621s
90 0.071054s 0.085647s
100 0.042847s 0.109610s
200 0.116581s 0.385381s
300 0.927886s 0.719902s
400 1.030033s 1.617557s
500 1.595740s 2.577250s
600 2.553515s 3.609214s
700 3.650004s 4.948367s
800 4.860585s 6.840540s
900 6.040074s 8.451447s
1000 7.586206s 10.528636s
5000 196.896556s 259.612446s
As soon as the size of matrices is sufficiently significant so that there are enough operations to be performed, the naïve implementation is quasi-systematically executed faster that the one using expression templates, which is not what I expected.
Upvotes: 3
Views: 138
Reputation: 275946
Your "naive" solution over-allocates buffers. But memory allocation in modern OS's cost doesn't scale with the size of the allocation. So you end up with a needless overhead on it, but that overhead doesn't scale with the size of the matrix.
You can get a substantial speedup by doing this:
inline dense_matrix_naive operator+(dense_matrix_naive m1, constdense_matrix_naive& m2)
{
m1 += m2;
return m1;
}
and adding move/copy semantics:
dense_matrix_naive(dense_matrix_naive&&)=default;
dense_matrix_naive& operator=(dense_matrix_naive&&)& =default;
// optional, I sometimes skip them because they are expensive and require
// explicit .copy() method calls:
// dense_matrix_naive(dense_matrix_naive const&)=default;
// dense_matrix_naive& operator=(dense_matrix_naive const&)& =default;
Before hand, if we did:
dense_matrix_naive foo = m1 + m2 + m3;
this is parsed as:
dense_matrix_naive foo = (m1 + m2) + m3
now, your (m1 + m2) allocates a buffer and stores the result into it. Call this tmp
:
auto tmp = m1+m2;
dense_matrix_naive foo = std::move(tmp) + m3
in this step, we then allocate another temporary and store tmp+m3
in it:
auto tmp = m1+m2;
auto tmp2 = std::move(tmp)+m3;
dense_matrix_naive foo = std::move(tmp2);
The constructor for foo then allocates yet another buffer, then copies the elements of tmp2
over to it.
Finally, tmp
and tmp2
are cleaned up.
With my variant the same parsing happens:
auto tmp = m1+m2;
auto tmp2 = std::move(tmp)+m3;
dense_matrix_naive foo = std::move(tmp2);
but the code is designed to do it optimally.
auto tmp = m1+m2;
here, we copy m1
into a temporary, then we add m2
to that temporary using +=
, then we store it in tmp
without further copies.
auto tmp2 = std::move(tmp)+m3;
here, we move tmp
into the first argument of +
(which is a value, so accepts moves, not a const&
that ignores them). We then increment the values +=
by m3
's values, and then using implicit move we directly store that in tmp2
without a buffer allocation.
In the original, we had allocated 2 buffers by this point -- in the rewritten, we have allocated only 1. No buffers are allocated in tmp2
calculation because we end up reusing tmp
's buffer (as we know it will be never used again before it is discarded).
dense_matrix_naive foo = tmp2;
and because we have dense_matrix_naive(dense_matrix_naive&&) = default;
constructor, this is a free operation. tmp2
just becomes foo
.
X operator+(X const& lhs, X const& rhs) {
X tmp = lhs;
tmp+=rhs;
return tmp;
}
this is an anti-pattern, while
X operator+(X lhs, X const& rhs) {
lhs+=rhs;
return lhs;
}
is never worse and usually better. By taking the lhs argument by value, we move the copy to the tmp
into a spot that the caller can know about it. And because of how a+b+c+d+e
is handled by C++ it lets us reuse buffers that the other version could not.
...
So that in detail explains why at small sizes your expression template version is faster. We are avoiding 1 or 2 (I'd have to double check) calls to malloc.
Once your buffers become large, then the fact that your naive implementation is cache-friendly makes a bigger difference.
In both cases, we have your blocks of memory:
a b c d a b c d
e f g h e f g h
i j k l + i j k l
m n o p m n o p
The naive version ends up iterating over this memory in order it is stored in RAM. We take the top left lhs += the top left on the rhs, then the next one, etc. When the matrixes are 5000 wide and 5000 tall, we iterate over 100,000,000 bytes of memory like two marching ants.
After a bit of inlining, converting that into SIMD instructions is trivial.
In the expression template case:
a b c d a b c d a b c d
e f g h e f g h e f g h
i j k l + i j k l + i j k l
m n o p m n o p m n o p
we grab the 3 top left elements and store it in a 4th buffer. Then we grab the b elements and store it in the 4th buffer.
Each of the element accesses is through a virtual
function, which can slow things down or block inlining.
Making a SIMD (lhs[i]+=rhs[i]) is trivial. Making a SIMD(lhs[i]=rhs0[i]+rhs1[i]+rhs2[i]) is much harder.
What more, this operation iterates over 400,000,000 bytes of data total, instead of 200,000,000, and does it along 4 different paths instead of 2. This harms cache coherency.
Now, the naive one with extra buffers ends up touching more memory in total, so that would be an edge to the expression template one - but the cache coherency and SIMD-ability carries the day.
One idea for your expression templates is to rely on repeated looped += instead of one + loop. And maybe make your expression template do a row at a time, instead of an element at time.
To that end, a .get_row( int row_num, T* )
method on your expression template could work.
void get_row( int row_num, T* zeroed_output ) const {
const auto columns = cols();
auto incremental = [row_num, zeroed_output, columns](auto& rhs) {
for (int col_num = 0; col_num < columns; ++col_num) {
zeroed_output[col_num] = f(zeroed_output[col_num], subscripts(rhs, row_nom, col_num));
}
};
std::apply([&incremental](auto...&& rhs) {
(incremental(rhs), ...);
}, args_ );
}
the goal here is to increase cache-coherency on the operation and make it as easy to SIMD optimize as possible.
Then in the constructor for your matrix class, you fetch values from the rhs one row at a time (directly into your output buffer).
...
But really get rid of that virtual stuff, especially during testing.
Upvotes: 1