Reputation: 5819
I am exploring options for converting a multithreaded C++ application to an asynchronous style of networking, based on boost asio and completion handlers. However there are two sticking points:
Following my comment to @sehe's answer:
My objective is to have the plug-in asynchronously post work for the server side of my code (via an async write) and but have my own thread run the write and read completion handers, the latter being invoked for server side responses. I am trying to model the above scenario with a with two threads. One under the control of the io-context (representing my own thread) and the other not under the control of the io-context (representing the 3d application plug-in thread). Here is my feeble attempt:
#include <iostream>
#include <boost/asio.hpp>
boost::asio::io_context io_context_;
boost::asio::ip::tcp::socket socket_(io_context_);
boost::asio::streambuf receive_buffer_;
void read_completion_handler(const boost::system::error_code& ec, std::size_t bytes_transferred);
void write_completion_handler(const boost::system::error_code& ec, std::size_t bytes_transferred);
// "plug-in" (not part of io-context) calls this
void sendNextMsg()
{
io_context_.run();
while(true)
{
std::string msg;
std::cout << "msg > " << std::flush;
std::cin >> msg;
boost::asio::async_write(socket_, boost::asio::buffer(msg + "\n"), write_completion_handler);
}
}
void write_completion_handler(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
if(ec)
{
std::cerr << "send failed: " << ec.message() << std::endl;
sendNextMsg();
}
else
{
std::cout << "Write succeeded" << std::endl;
boost::asio::async_read_until(socket_, receive_buffer_, "\n", read_completion_handler);
}
}
void read_completion_handler(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
if(ec)
{
std::cerr << "receive failed: " << ec.message() << std::endl;
}
else
{
const char* data = boost::asio::buffer_cast<const char*>(receive_buffer_.data());
std::cout << "Server response: " << data << std::endl;
receive_buffer_.consume(bytes_transferred);
}
sendNextMsg();
}
int main()
{
boost::system::error_code ec;
socket_.connect(boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 7777), ec);
if(ec)
{
std::cerr<< "[Connection error] " << ec.message() <<std::endl;
exit(-1);
}
std::thread{sendNextMsg}.detach();
io_context_.run();
return 0;
}
Outcome: application establishes a connection with the server but terminates after calling sendNextMsg
.
Upvotes: 1
Views: 494
Reputation: 393259
Q. Can one thread make an async write but another thread run its completion handler?
Yes. Asio is affords concurrency without threads.
You are in control. You choose how many threads do IO or post tasks. You choose how many execution contexts you use. You choose how many threads service these execution contexts. You choose where handlers may run.
As in all applications that deal with threads you may need to protect shared state from concurrent access. You choose whether that applies to your application, and how to ensure it (queues, strands, classic thread synchronization primitives etc).
Simplest of examples where one thread posts an operation, and the handler runs elsewhere. Assume
asio::io_context ioc;
std::cout << "Posting from #" << thread_id << std::endl;
post(ioc, []{ std::cout << "Hello from #" << thread_id << std::endl; });
std::thread([&ioc] { ioc.run(); }).join();
Prints Live
Posting from #0
Hello from #1
Similarly,
asio::steady_timer tim(ioc, 1s);
std::cout << "Waiting from #" << thread_id << std::endl;
tim.async_wait([](error_code ec){ std::cout << ec.message() << " from #" << thread_id << std::endl; });
Prints Live
Waiting from #0
Success from #1
Somewhat more interestingly:
asio::io_context ioc;
asio::thread_pool pool(10);
Here
auto handler = [](auto name, error_code ec) {
sleep_for(1s);
println("handler name:", name, " (ec:", ec.message(), ")");
};
asio::steady_timer tim(ioc, 1s);
tim.async_wait(bind(handler, "IO", _1));
tim.async_wait(bind_executor(pool, bind(handler, "Pool", _1)));
ioc.run(); // Note, on main thread!
Would print Live
0ms thread #0: Main enter
2000ms thread #0: handler name:IO (ec:Success)
2000ms thread #0: IO done, waiting for pool
3001ms thread #1: handler name:Pool (ec:Success)
3001ms thread #0: Main exit
Adding (optionally serialized) tasks on the pool:
auto task = [](auto name) {
return [=] { sleep_for(1s); println("task: ", name, " done"); };
};
for (auto ex = pool.get_executor(); std::string i : {"1", "2", "3", "4", "5"})
asio::post(bind_executor(ex, task("Task " + i)));
for (auto strand = make_strand(pool); std::string i : {"6", "7", "8", "9", "10"})
asio::post(ioc, bind_executor(strand, task("Serial " + i)));
Prints Live:
0ms thread #0: Main enter
1000ms thread #1: task: Task 3 done
1000ms thread #2: task: Task 4 done
1000ms thread #3: task: Serial 6 done
1000ms thread #4: task: Task 5 done
1000ms thread #5: task: Task 1 done
1000ms thread #6: task: Task 2 done
2000ms thread #0: handler name:IO (ec:Success)
2000ms thread #0: IO done, waiting for pool
2001ms thread #3: task: Serial 7 done
3001ms thread #7: handler name:Pool (ec:Success)
3001ms thread #3: task: Serial 8 done
4001ms thread #3: task: Serial 9 done
5001ms thread #3: task: Serial 10 done
5002ms thread #0: Main exit
Note Although the "Serial" tasks are serialized on the pool, they are not guaranteed to run on a specific thread. In this case they happen to, because of how Asio optimizes strand execution if possible.
Q. The first thread belongs to a framework (and therefore cannot be part of the async thread pool) but is the only thread that can create work.
This is not even necessarily accurate. E.g. you can invoke poll_one()
regurlarly from a framework thread. Also, you can asio::thread_pool::attach
from your own threads.
But these may not be relevant to your application.
The framework thread also makes the async write to a socket, but it immediately returns to running framework code, and is therefore unavailable. The second thread is created by my code is thus available to run the completion handler. Does boost asio have a way for the framework thread to initiate asynchronous IO but another thread run the resulting completion handler?
There are many ways. For one, you could pass future
s to the framework, which can await them where it requires the result, e.g.
std::future<size_t> n = asio::async_read(s, buf, asio::use_future);
// ... later:
framework::handle_result(app_state, n.get());
Note here
n.get()
may throwsystem_error
on error
You can also post any packaged task like that:
std::future<int> answer = post(ioc, std::packaged_task<int()> {
sleep_for(100ms);
return 42;
});
Secondly, drawing from the above, you could
poll()
/poll_one()
regularly.Note if you bind the executor to the handler at initiation, composed operations are required to run all intermediate handlers there. If this is not what you want/require, consider indirecting like e.g.
asio::async_read(s, buf, [app_state](error_code ec, size_t n) {
asio::dispatch(framework_executor, [=] {
framework::handle_result(app_state, ec, n);
}
});
Q. If the async model does not draw a distinction between compute and IO threads and threads aren't preemptible (unless we are using coroutines) isn't there a real possibility that an IO completion handler, e.g. read, can be indefinitely prevented from running by threads doing compute?
Yes. This is solved by separating threads (like in the examples I already showed) and/or by (priority) queuing tasks so that you can throttle them.
Anti bit-rot the combined listing of the initial examples above:
#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace std::placeholders;
using boost::system::error_code;
namespace { // fancy tracing for illustration
using std::this_thread::sleep_for;
static int next_thread_id = 0;
thread_local int thread_id = next_thread_id++;
constexpr auto now = std::chrono::steady_clock::now;
static auto const start = now();
static std::mutex console_mx;
static void println(auto const&... a) {
std::lock_guard lk(console_mx);
std::cout << std::setw(10) << (now() - start) / 1ms << "ms thread #" << thread_id << ":\t";
(std::cout << ... << a) << std::endl;
};
} // namespace
int main() {
println("Main enter");
asio::io_context ioc;
asio::thread_pool pool(10);
auto handler = [](auto name, error_code ec) {
sleep_for(1s);
println("handler name:", name, " (ec:", ec.message(), ")");
};
asio::steady_timer tim(ioc, 1s);
tim.async_wait(bind(handler, "IO", _1));
tim.async_wait(bind_executor(pool, bind(handler, "Pool", _1)));
auto task = [](auto name) {
return [=] { sleep_for(1s); println("task: ", name, " done"); };
};
for (auto ex = pool.get_executor(); std::string i : {"1", "2", "3", "4", "5"})
asio::post(bind_executor(ex, task("Task " + i)));
for (auto strand = make_strand(pool); std::string i : {"6", "7", "8", "9", "10"})
asio::post(ioc, bind_executor(strand, task("Serial " + i)));
ioc.run(); // Note, on main thread!
println("IO done, waiting for pool");
pool.join();
println("Main exit");
}
Upvotes: 2