Reputation: 565
Is it possible to vectorize this loop (with g++)?
char x;
int k;
for(int s = 0; s < 4; s++) {
A[k++] += B[x&3];
x >>= 2;
}
A
and B
are pointers to non-overlapping float arrays; B
has indices 0 to 3. I need to maximize portability as this is for an R
package, so the best would be to rewrite in such a way that g++ would be able to vectorize it alone, as I don’t know how to make SSE code portable in this context (the package RcppEigen
makes the library Eigen
available so it is possible).
Many thanks in advance for your thoughts.
P.S. The code in which it is nested looks like
int k = 0;
for(size_t j = 0; j < J; j++) {
char x = data[j];
for(int s = 0; s < 4; s++) {
A[k++] += B[x&3];
x >>= 2;
}
}
Upvotes: 4
Views: 547
Reputation: 1736
Here's my already mentioned c++14 attempt to hack that faster, which yielded less than than zero to nothing; in fact vectorized_op
is roughly 20% slower than unvectorized_op
in the working, and 200% slower in the mocked version.
g++-4.9.2/clang++-3.5.0 on x86:64 Debian Jessie with
-std=c++14 -O3 -Wall -pedantic -msse4.1 -ftree-vectorize -ffast-math
(resp += -stdlib=libc++ for clang)
(updated)
#include <iterator>
#include <algorithm>
#include <valarray>
#include <array>
#include <iostream>
#include <iomanip>
#include <cstdlib>
#include <ctime>
#include <fstream>
using std::size_t;
using std::endl;
auto& Log = std::cout;
using real_type = float;
using ra_type = std::valarray<real_type>;
using ia_type = std::valarray<size_t>;
using workload_type = unsigned char;
using ca_type = std::valarray<workload_type>;
static constexpr bool BUILD_PIVOTS_H = false;
static constexpr size_t B_SIZE = 4;
static constexpr size_t MAX_CHAR = 256;
static constexpr size_t PRECOMPUTED_SIZE = MAX_CHAR * B_SIZE;
static std::array<workload_type, PRECOMPUTED_SIZE> PRECOMPUTED __attribute__ ((aligned(16)));
static std::array<real_type, PRECOMPUTED_SIZE> LUT __attribute__ ((aligned(16)));
struct sentry {
sentry(const std::string& s) {
Log << s << endl;
}
};
void unvectorized_op(ra_type& A, ra_type&B, const ca_type& data) {
static sentry _("unvectorized_op:");
int k = 0;
for(size_t j = 0; j < data.size(); ++j) {
char x = data[j];
#ifdef MOCK
for (auto& i : {0,1,2,3})
A[k++] = real_type(0);
#else
for(int s = 0; s < 4; ++s) {
A[k++] += B[x&3];
x >>= 2;
}
#endif
}
}
void vectorized_op(ra_type& A, ra_type& B, const ca_type& data) {
static sentry _("vectorized_op:");
ra_type bflat(A.size());
auto beg = std::begin(bflat);
std::for_each(std::begin(data), std::end(data),
[&beg] (const workload_type& x) {
const auto& it = &LUT[/*unsigned workload_type)*/(x)*4];
#ifdef MOCK
for (auto& i : {0,1,2,3})
*(beg++) = real_type(0);
#else
std::for_each(it, it + 4,
[&beg] (const real_type& c) {*beg = c; ++beg;}
);
#endif
}
);
A = bflat;
}
int main (int argc, char* argv[]) {
int magnitude = -1;
if (argc > 1) {
try {
magnitude = std::stoi(argv[1]);
} catch (const std::runtime_error& e ){
/*ignore*/
}
}
if ((magnitude < 1) || (magnitude > 10)) {
Log << "no magnitude between 2 and 8 given; choosing 5\n";
magnitude = 5;
}
/* precomputing the pivoting */ {
size_t i = 0;
workload_type x = 0;
std::generate(std::begin(PRECOMPUTED), std::end(PRECOMPUTED),
[&i, &x] () {
workload_type rv = x & 3;
x >>= 2;
++i;
(i % B_SIZE) == 0 ? x = i/B_SIZE : x >>= 2;
return rv;
}
);
}
ra_type B(B_SIZE);
::srand48(::clock());
std::generate(std::begin(B), std::end(B), ::drand48);
std::transform(std::begin(PRECOMPUTED), std::end(PRECOMPUTED),
std::begin(LUT),
[&B] (const workload_type& c) {
return B[c];
});
for (auto &f : {unvectorized_op, vectorized_op}) {
for (int i = 1; i < magnitude; ++i) {
size_t workload_size = pow(10, i);
size_t repetitions = pow(10, magnitude - i);
ca_type data(workload_size);
std::generate(std::begin(data), std::end(data),
[] () {return workload_type(::drand48() * MAX_CHAR);});
ra_type A(0., workload_size * B_SIZE);
auto start = ::clock();
for (unsigned long j = 0; j < repetitions; ++j) {
A = 0.;
f(A, B, data);
}
auto duration = ::clock() - start;
Log << std::setw(8) << repetitions << " repetitions with a "
<< std::setw(8) << workload_size << " workload took "
<< double(duration)/CLOCKS_PER_SEC << "s"
<< endl;
} // for i in magnitudes
} // for auto f
if (BUILD_PIVOTS_H) {
try {
std::ofstream pivots_h("pivots.h");
pivots_h << "#ifndef PIVOTS_H_ \n\n";
pivots_h << "const char PIVOTS[256][4] = {\n";
for (auto i = 0; i < 256; ++i) {
pivots_h << " {";
for (auto j = 0; j < 4; ++j) {
pivots_h << long(PRECOMPUTED[i*4+j]) << (j < 3 ? ", " : "");
}
pivots_h << ("}") << (i < 255 ? ",\n" : "\n");
}
pivots_h << "} __attribute__ ((aligned(16)));\n";
pivots_h << "#endif\n" << std::endl;
} catch (const std::runtime_error& e) {
Log << "ERROR: " << e.what() << std::endl;
}
}
}
And here's the Fortran 95 version:
module setup
implicit none
logical GENERATE_PIVOTS
parameter(GENERATE_PIVOTS = .TRUE.)
integer, parameter :: CHAR_TYPE = 1
integer, parameter :: REAL_TYPE = 4
integer(CHAR_TYPE), dimension(0:255, 0:3) :: PIVOTS
real (REAL_TYPE), dimension(0:255, 0:3) :: LUT
real(REAL_TYPE), dimension(0:3) :: B
contains
subroutine init(B)
implicit none
real(REAL_TYPE), dimension(0:3), intent(in) :: B
integer :: r, c
if (GENERATE_PIVOTS) then
do r = lbound(PIVOTS,1),ubound(PIVOTS,1)
do c = lbound(PIVOTS,2),ubound(PIVOTS,2)
PIVOTS(r,c) = int(iand(ishft(r, -c), 3), kind=1)
end do
end do
else
!> @TODO load PIVOTS FROM file
end if
do r = 0,255
do c = 0,3
LUT = B(PIVOTS(r,c))
end do
end do
end subroutine init
end module setup
module testling
use setup
implicit none
contains
subroutine generate(A, data)
real(REAL_TYPE), dimension(:,:), intent(inout):: A
integer(CHAR_TYPE), dimension(:), intent(in) :: data
integer :: i
!A = LUT(data, :)
forall (i=lbound(A,1):ubound(A,1))
A(i,:) = LUT(data(i), :)
end forall
end subroutine generate
end module testling
subroutine do_test(workload_size, repetitions)
use setup
use testling
implicit none
integer, intent(in) :: workload_size
integer, intent(in) :: repetitions
real(REAL_TYPE), dimension(0:workload_size-1,0:3) :: A
integer(CHAR_TYPE), dimension(0:workload_size-1) :: data
integer(8) :: s, e, cr, cm
integer(4) :: i
real :: random
make_workload : do i = lbound(data,1),ubound(data,1)
call random_number(random)
data(i) = int(floor(random * 256),kind=CHAR_TYPE)
end do make_workload
call system_clock(s, cr, cm)
do i = 1,repetitions
call generate(A, data)
end do
call system_clock(e, cr, cm)
write(*,'(I8, " repetitions for a ",I8," workload took ",F12.10,"s")')&
REPETITIONS, WORKLOAD_SIZE, real(e - s,kind = 8)/cr
end subroutine do_test
program main
use setup
implicit none
integer(8) :: c, cr, cm
integer(4) :: i, seed
character(len=100) :: buffer
integer(1) :: magnitude = -1
integer :: workload_size
integer :: repetitions
if (iargc() > 0) then
call getarg(1, buffer)
read(buffer,*) magnitude
end if
if ((magnitude < 2) .or. (magnitude > 10)) then
write(*,*) "no magnitude between 2 and 8 given; choosing 5"
magnitude = 5
endif
call system_clock(c,cr,cm)
seed = int(c/cr, kind=4)
call random_seed(seed)
fill_B: do i = lbound(B,1),ubound(B,1)
call random_number(B(i))
end do fill_B
call init(B)
do i=1,magnitude-1
workload_size = 10**i
repetitions = 10**(magnitude - i)
call do_test(workload_size, repetitions)
end do
end program main
gfortran-4.9 also called with
-O3 -Wall -pedantic -msse4.1 -ftree-vectorize -ffast-math
Summary
Run with random workloads between 10 and 10M.
The normalized runtime for the unvectorized C++ version and the Fortran version for workloads < 1M is slightly worse than its nominal O(n)
with reference to the workload size, roughly doubles above 1M, but stays slightly worse than the nominal O(n)
with reference the new base for a 10M workload .
The Fortran solution is roughly 40-50% faster than the unvectorized C++ version.
The "vectorized"-labeled C++ function is a mild catastrophy at best; minimum 20% slower than the unvectorized version.
Upvotes: 1
Reputation: 4038
I found another way of optimization with using of SSSE3 instructions:
__m128i _B = _mm_castps_si128(_mm_loadu_ps(B));
__m128i _mul = _mm_setr_epi16(64, 64, 16, 16, 4, 4, 1, 1);
__m128i _and = _mm_set1_epi8(12);
__m128i _or = _mm_setr_epi8(0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3);
for (size_t j = 0; j < J; j++)
{
const char x = data[j];
__m128i x1 = _mm_set1_epi8(x);
__m128i x2 = _mm_mullo_epi16(x1, _mul);
__m128i x3 = _mm_srli_epi16(x2, 4);
__m128i x4 = _mm_and_si128(x3, _and);
__m128i x5 = _mm_or_si128(x4, _or);
_mm_storeu_ps(A, _mm_add_ps(_mm_loadu_ps(A), _mm_castsi128_ps(_mm_shuffle_epi8(_B, x5))));
A += 4;
}
But I think that previous variants are better.
Upvotes: 3
Reputation: 212959
Depending on how big your outer loop is it might be worthwhile to pre-compute a LUT for the sets of four constants which you are going to be adding in your inner loop:
const float C[256][4] = {
{ B[0], B[0], B[0], B[0] },
{ B[1], B[0], B[0], B[0] },
{ B[2], B[0], B[0], B[0] },
{ B[3], B[0], B[0], B[0] },
{ B[0], B[1], B[0], B[0] },
{ B[1], B[1], B[0], B[0] },
{ B[2], B[1], B[0], B[0] },
{ B[3], B[1], B[0], B[0] },
...
{ B[3], B[3], B[3], B[3] }
} __attribute__ ((aligned(16)));
(your probably want to generate this LUT programmatically, rather than coding it inline as above).
Then the actual processing could be simplified to:
int k = 0;
for(size_t j = 0; j < J; j++) {
const uint8_t x = data[j];
for(int s = 0; s < 4; s++) {
A[k + s] += C[x][s];
}
k += 4;
}
which should vectorize completely, if your compiler is having a good day.
Obviously this is only worthwhile if J
is sufficiently large that the cost of setting up the C
LUT can be amortised over a large number of subsequent iterations.
Upvotes: 2
Reputation: 212959
Without resorting to SIMD intrinsics I don't think there is a great deal you can do, but you may be able to at least encourage the compiler to vectorize the 4 additions per iteration:
int k = 0;
for(size_t j = 0; j < J; j++) {
char x = data[j];
float C[4] __attribute__ ((aligned(16)));
for(int s = 0; s < 4; s++) {
C[s] = B[x&3];
x >>= 2;
}
for(int s = 0; s < 4; s++) {
A[k + s] += C[s]; // this loop should be vectorized
}
k += 4;
}
Upvotes: 2
Reputation: 4038
There is a solution with using of AVX2 :
__m256 _B = _mm256_setr_ps(B[0], B[1], B[2], B[3], B[0], B[1], B[2], B[3]);
__m256i _shift = _mm256_setr_epi32(0, 2, 4, 6, 8, 10, 12, 14);
__m256i _mask = _mm256_set1_epi32(3);
for (size_t j = 0; j < J/2; j++)
{
short x = ((short*)data)[j];
__m256i _index = _mm256_and_si256(_mm256_srlv_epi32(_mm256_set1_epi32(x), _shift), _mask);
_mm256_storeu_ps(A, _mm256_add_ps(_mm256_loadu_ps(A), _mm256_permutevar8x32_ps(_B, _index)));
A += 8;
}
Upvotes: 4