Reputation: 2374
I'm trying to synchronized the completion handlers using strand
but I'm not getting the expected output. When I'm using asio::post
without wrapping the completion handler in strand
I get the correct output but it is not synchronized. When I wrap the completion handler in strand
, I don't receive any output.
Here is the minimal reproducible example:
#include <asio.hpp>
#include <thread>
#include <iostream>
#include <vector>
#include <random>
struct Task
{
Task(int id, int wait_time)
: id_{id}
, wait_time_{wait_time}
{}
void operator()()
{
std::cout << "Tast-" << id_ << " started. [" << std::this_thread::get_id() << "]" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
std::cout << "Task-" << id_ << " finished after (" << wait_time_
<< ") milliseconds. [" << std::this_thread::get_id() << "]" << std::endl;
}
int id_;
int wait_time_;
};
int main()
{
std::random_device rd;
std::mt19937 engine(rd());
std::uniform_int_distribution d(500, 2'000);
asio::io_context ctx;
asio::io_context::strand strand(ctx);
std::vector<std::jthread> threads;
auto count = 4;
for (int i = 0; i < count; ++i)
{
threads.emplace_back([&]{ ctx.run(); });
}
for (int i = 0; i < count * 2; ++i)
{
asio::post(Task(i + 1, d(engine)));
// asio::post(strand.wrap(Task(i + 1, d(engine)))); /* THIS DOESN'T PRODUCE ANY OUTPUT */
}
ctx.run();
}
Output when using asio::post(Task(i + 1, d(engine)));
Tast-1 started. [17652]
Tast-7 started. [26096]
Tast-3 started. [56484]
Tast-8 started. [32000]
Tast-5 started. [Tast-6 started. [79448]
61340]Tast-2Tast-4 started. [55696]
started. [84880]
Task-6 finished after (784) milliseconds. [79448]
Task-2 finished after (835) milliseconds. [84880]
Task-1 finished after (923) milliseconds. [17652]
Task-4 finished after (1281) milliseconds. [55696]
Task-3 finished after (1668) milliseconds. [56484]
Task-7 finished after (1763) milliseconds. [26096]
Task-8 finished after (1888) milliseconds. [32000]
Task-5 finished after (1982) milliseconds. [61340]
How to use strand
to synchronize these completion handlers? I'm using asio
standalone on Windows 10 and MSVC compiler.
Upvotes: 1
Views: 326
Reputation: 393769
The other answer identified your race condition.
Instead of manually managing the threads, only to then require a work-guard as well, I'd use the asio::thread_pool
facility instead.
Also, don't use the deprecated strand nested typedef. Instead use a strand<>
executor adaptor:
#include <boost/asio.hpp>
#include <iostream>
#include <random>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using Duration = std::chrono::steady_clock::duration;
struct Task {
int id_;
Duration delay_;
void operator()() const {
static int tid_gen = 0;
thread_local std::string const tid = "[" + std::to_string(++tid_gen) + "] ";
std::cout << tid << "Task-" << id_ << " started" << std::endl;
std::this_thread::sleep_for(delay_);
std::cout << tid << "Task-" << id_ << " finished after " << delay_ / 1.ms << "ms" << std::endl;
}
};
int main() {
auto d = bind(std::uniform_int_distribution(500, 2'000), std::mt19937(std::random_device{}()));
asio::thread_pool ctx(4);
auto strand = make_strand(ctx);
for (int i = 1; i <= 8; ++i)
post(bind_executor(strand, Task{i, 1ms * d()}));
ctx.join();
}
Prints e.g.
[1] Task-1 started
[1] Task-1 finished after 839ms
[1] Task-2 started
[1] Task-2 finished after 1674ms
[1] Task-3 started
[1] Task-3 finished after 1355ms
[1] Task-4 started
[1] Task-4 finished after 1368ms
[1] Task-5 started
[1] Task-5 finished after 1141ms
[1] Task-6 started
[1] Task-6 finished after 771ms
[1] Task-7 started
[1] Task-7 finished after 1445ms
[1] Task-8 started
[1] Task-8 finished after 881ms
Mixing in some non-strand tasks (#9-12):
for (int i = 1; i <= 8; ++i)
post(bind_executor(strand, Task{i, 1ms * d()}));
for (int i = 9; i <= 12; ++i)
post(ctx, Task{i, 1ms * d()});
Prints e.g.
[1] Task-9 started
[2] Task-1 started
[3] Task-10 started
[4] Task-11 started
[3] Task-10 finished after 984ms
[3] Task-12 started
[1] Task-9 finished after 1007ms
[2] Task-1 finished after 1417ms
[2] Task-2 started
[4] Task-11 finished after 1958ms
[2] Task-2 finished after 597ms
[2] Task-3 started
[3] Task-12 finished after 1659ms
[2] Task-3 finished after 1532ms
[2] Task-4 started
[2] Task-4 finished after 639ms
[2] Task-5 started
[2] Task-5 finished after 1576ms
[2] Task-6 started
[2] Task-6 finished after 1617ms
[2] Task-7 started
[2] Task-7 finished after 889ms
[2] Task-8 started
[2] Task-8 finished after 1451ms
Upvotes: 2
Reputation: 21123
You have a race condition in the behavior of the asio::io_context
. If you call io_context::run()
on it with no pending handlers to execute (and no executor_work_guard
objects associated with it), the context simply goes into a stopped state, and returns.
So, what's happening in one case is that you're populating the context before any of the threads start executing run()
, which means there is work to be performed, so it works as expected. Or a thread reaches run()
first, which because there is no work yet, the io_context
enters the stopped state, and nothing is printed.
For a quick test, you can move the thread creation until after your for loop. For a longer running work queue, I recommend looking at the documentation for the work guard, which will prevent the context from stopping.
Upvotes: 2