Reputation: 340
I'm seeking advice about my approach to the following problem. I have a constant input of data that I need to add to my buffer, and at every iteration, I need to pass buffered data to a function that accepts C-style array through a pointer.
I'm worrying about efficiency so I pondered how could I store and manage data in some sort of circular buffer, but also get it as a sequential raw data to pass it to the said function.
My current approach can be summarized in the following example:
#include <iostream>
#include <array>
#include <algorithm>
void foo(double* arr, int size)
{
for (uint k = 0; k < size; k++)
std::cout << arr[k] << ", ";
std::cout << std::endl;
}
int main()
{
const int size = 20;
std::array<double, size> buffer{};
for (double data = 0.0; data < 50.0; data += 1.0)
{
std::move(std::next(std::begin(buffer)), std::end(buffer), std::begin(buffer));
buffer.back() = data;
foo(buffer.data(), size);
}
}
In real use-case, the buffer also needs to be padded to a "const" size of data at the beginning (I use quotes here because size may, or may not be known at compile-time, but once it is known, it will never change).
I store data in the std::array
(or in std::vector
if the size will not be known at compile-time) since the data is sequential in memory. When I need to insert new data, I use forward std::move
to shift everything, and then I manually replace the last item. Finally, I just pass std::array::data()
and its size to the function.
While at first glance this should work efficiently, reason tells me that because data is sequentially stored, the whole buffer will still be copied with std::move
, and each insert will be O(n)
Real buffer size will probably be only in hundreds and data is arriving at 100Hz max, but the problem is I need the result of the called function as soon as possible so I don't want to lose time on a buffer management (even if we are talking few, or even less than ms). I have many questions about this, but their short-list is following:
Upvotes: 1
Views: 6079
Reputation: 394
Besides the answer by stribor14 I have two other suggestions. These are only based on performance, so readable or maintainable code will not really be found here.
My first idea when reading the problem was also to allocate twice the amount of storage but only write it once. When all places are written the second half will be copied over to the first half. My first instinct says this could be a better performing. My reasoning was that the same number of total writes will happen but all of the writes are sequential (instead of jumping every second write to another place in the array).
#include <cstddef>
#include <cstring>
#include <array>
const size_t buffer_size = 50'000;
int main()
{
std::array<double, 2 * buffer_size> buffer{};
double *index = buffer.data();
double *mid = index + buffer_size;
for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
{
if (index == mid)
{
index = buffer.data();
std::memcpy(index, mid, buffer_size * sizeof(double));
}
*(index++ + buffer_size) = data;
foo(index, buffer_size);
}
}
Alternatively I thought it would be possible to optimize to OP's own answer to remove the array accesses. The idea is that buffer[buffer_idx - buffer_size]
takes 2 additions to calculate the location of that value namely: *(buffer + buffer_idx - buffer_size)
. If buffer_idx
contains a pointer, only one addition is needed. This gives following code:
#include <cstddef>
#include <array>
const size_t buffer_size = 50'000;
int main()
{
std::array<double, buffer_size * 2> buffer{};
double *index = buffer.data();
double *mid = buffer.data() + buffer_size;
for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
{
*index = data;
*(index + buffer_size) = data;
++index;
index -= buffer_size * (index == mid);
foo(index, buffer_size);
}
}
It was now I noticed that I was going down the rabbit hole of C++-optimization. So we couldn't stop there. To choose which implementation to use I wanted to run a benchmark. Werner Pirkl gave a good starting point. But running this on our optimized code is nonsensical because the measured times are 0μs. So let's change it a bit I wrote a loop more inside the benchmark to gave it some runtime and came up with:
const int repeats = 1000;
volatile double *ptr;
int duration = 0;
const size_t buffer_size = 50'000;
// ... Set up of the buffers and indices
for (int i = 0; i < repeats; ++i)
{
auto t1 = std::chrono::high_resolution_clock::now();
for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
{
// ... add data to circular buffer
ptr = // ... the start of the array
}
auto t2 = std::chrono::high_resolution_clock::now();
duration += std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count();
}
(Note the use of a volatile double *
to ensure that the raw pointer to the contiguous array is not optimized out.)
While running these tests I noticed they are very dependent on compiler-flags (-O2 -O3 -march=native ...). I will give some results, but like all C++-benchmarks, take it with a grain of salt and run your own with a real-world workload. (The reported times are average ns per insertion)
with `memcpy` stribor14 `operator[]` with pointers
|---------------|-----------|--------------|---------------|
-O2 | 1.38 | 1.57 | 1.41 | 1.15 |
-O3 | 1.37 | 1.63 | 1.36 | 1.09 |
-O3 -march=native | 1.35 | 1.61 | 1.34 | 1.09 |
Needless to say: I was quite disappointed about what I thought should perform the best. But as earlier stated, this benchmark is in no way representative of any real-world performance.
Upvotes: 1
Reputation: 340
Thank you for the answer Werner. When I run this solution on Repl.it, I get:
it took an average of 21us and a max of 57382us
For comparison, my original idea with the same buffer size has the following result:
it took an average of 19us and a max of 54129us
This means that my initial approach indeed was naive :)
In the meantime, while waiting for the answer, I've come up with following solution:
#include <iostream>
#include <array>
#include <algorithm>
#include <chrono>
void foo(double* arr, int size)
{
for (uint k = 0; k < size; k++)
std::cout << arr[k] << ", ";
std::cout << std::endl;
}
int main()
{
const int buffer_size = 20;
std::array<double, buffer_size*2> buffer{};
int buffer_idx = buffer_size;
for (double data = 0.0; data < 100.0; data += 1.0)
{
buffer.at(buffer_idx - buffer_size) = data;
buffer.at(buffer_idx++) = data;
foo(buffer.data() + buffer_idx - buffer_size, buffer_size);
buffer_idx -= buffer_size * (buffer_idx == buffer_size * 2);
}
}
Since the size of the buffer is not a problem, I allocate twice the memory needed and insert data at two places, offset by the buffer size. When I reach the end, I just go back like the typewriter. The idea is that I fake the circular buffer by storing one more copy of data so it can read data as if it crossed full circle.
For buffer size of 50000, this gives me the following result which exactly what I wanted:
it took an average of 0us and a max of 23us
Upvotes: 1
Reputation: 484
You'll always have to copy your data, as a "continous" ringbuffer doesn't exist (maybe in some fancy silicon it does).
Also you can't initialize an array template of runtime defined size.
You could use a vector to achieve this:
#include <iostream>
#include <chrono>
#include <deque>
#include <vector>
int main() {
std::vector<double> v;
// pre fill it a little
for(double data = 0.0; data > -50000.0; data -= 1.0) {
v.push_back(data);
}
size_t cnt = 0;
int duration = 0;
int max = 0;
for(double data = 0.0; data < 50000.0; data += 1.0, ++cnt) {
auto t1 = std::chrono::high_resolution_clock::now();
v.push_back(data);
v.erase(v.begin());
// foo(v.data(), v.size());
auto t2 = std::chrono::high_resolution_clock::now();
auto delta = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
duration += delta;
if(max == 0 || max < delta) {
max = delta;
}
}
std::cout << "it took an average of " << duration / cnt << "us and a max of " << max << " us" << std::endl;
return 0;
}
Output:
it took an average of 11us and a max of 245 us
Upvotes: 0