Reputation: 179809
I've got a very simple Boost.Asio case: an async_read
protected by a deadline_timer
. I also have a std::atomic_bool DEBUG[2]
. The async_read
handler sets DEBUG[0]
; the deadline_timer
sets DEBUG[1]
. This happens unconditionally, even if the error code is error::operation_aborted
.
Now, when I call io_service::run_one()
I usually see either one of the DEBUG
indicators set. However, in at least 10% of the cases, run_one
returns 1
yet none of the two indicators are set i.e. neither of the two handlers was called. (Also the other side effects of the handler are missing).
Now run_one
is supposed to return the number of handlers executed, so when it returns 1 it must have executed a handler - but which handler, if not mine?
The reason I ask is because even after a .reset(), the io_service
object is broken.
Relevant code - rather verbose to make the problem clear:
boost::asio::deadline_timer deadline(thread_io_service);
deadline.expires_from_now(boost::posix_time::seconds(timeoutSeconds));
read_counter += 2; // Initialized to 1 in ctor, so always odd.
// C++11: Cannot capture expressions such as this->read_counter.
unsigned read_counter_copy = read_counter;
read_timeout.store(0, std::memory_order_release); // 0 = no timeout.
deadline.async_wait([&, read_counter_copy](boost::system::error_code const&)
{
// read_counter_copy is very intentionally captured by value - this timeout applies only to the next read.
read_timeout.store(read_counter_copy, std::memory_order_release);
DEBUG[0] = true;
}
);
// Start reading "asynchronously", wait for completion or timeout:
std::atomic<boost::system::error_code> ec(boost::asio::error::would_block);
size_t len = 0;
boost::asio::async_read(socket, boost::asio::buffer(buffer + byteShift), boost::asio::transfer_exactly(nrBytes),
[&](boost::system::error_code const& err, size_t bytesTransferred)
{
len = bytesTransferred;
ec.store(err, std::memory_order_release);
DEBUG[1] = true;
}
);
// We only have 5 states to deal with
enum { pending, timeout, read, read_then_timeout, timeout_then_read } state = pending;
for (;;)
{
if (state == read_then_timeout) assert(false); // unreachable - breaks directly
else if (state == timeout_then_read) assert(false); // unreachable - breaks directly
// [pending, read, timeout] i.e. only one handler has run yet.
thread_io_service.run_one(); // Don't trust this - check the actual handlers and update state accordingly.
if (state == pending && read_timeout.load(std::memory_order_acquire) == read_counter)
{
state = timeout;
socket.cancel(); // This will cause the read handler to be called with ec=aborted
continue;
}
if (state == read && read_timeout.load(std::memory_order_acquire) == read_counter)
{
state = read_then_timeout;
break; //
}
if (state == pending && ec.load(std::memory_order_acquire) != boost::asio::error::would_block)
{
state = read;
deadline.cancel();
continue;
}
if (state == timeout && ec.load(std::memory_order_acquire) != boost::asio::error::would_block)
{
state = timeout_then_read; // Might still be a succesfull read (race condition)
break;
}
// This is the actual problem: neither read nor timeout.
// DEBUG == {false,false} when this happens.
L_NET(warning) << "Boost.Asio spurious return";
}
assert(state == timeout_then_read || state == read_then_timeout);
thread_io_service.reset();
Upvotes: 1
Views: 862
Reputation: 51881
The handler being invoked is an intermediate completion handler created as part of the async_read
composed operation. Composed operations are implemented in zero of more terms of another operation, and each of these intermediate operations have completion handlers of their own. Furthermore, run_one()
does not treat these intermediate completion handlers differently.
In the case where neither the async_timeout
nor async_read
completion handlers are invoke, but run_one()
returns indicating a handler was ran, then the async_read
operation is being composed of at least two async_read_some
operations. This can occur if async_read()
is initiated when the socket has some data available, but the first read did not satisfy the completion condition. For example, the socket may have some data, but not all of the desired data (e.g. 0 < socket.available()
< buffer size).
One can enable the Handler Tracking to sometimes get better insight into which handlers are being invoked. When BOOST_ASIO_ENABLE_HANDLER_TRACKING
is defined, Asio will write handler debugging output to standard error, including handler identifiers. Here is an example that demonstrates handler tracking where an async_read
composed operation will be composed of at least two intermediate operations, and invoking io_service.run_one()
invokes an intermediate completion handler:
#include <functional> // std::bind
#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
const auto noop = std::bind([]{});
int main()
{
using boost::asio::ip::tcp;
// Create all I/O objects.
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
tcp::socket socket1(io_service);
tcp::socket socket2(io_service);
// Connect the sockets.
socket1.async_connect(acceptor.local_endpoint(), noop);
acceptor.accept(socket2);
io_service.run();
io_service.reset();
// Write data from socket1 to socket2.
const std::string data = "example";
boost::asio::write(socket1, boost::asio::buffer(data));
// Initiate a composed async_read operation that attempts to
// read more data than is immediately available.
assert(socket2.available());
std::vector<char> buffer(socket2.available() + 1);
boost::asio::async_read(socket2, boost::asio::buffer(buffer), noop);
// Invoke completion handler for intermediate async_read_some
// operatoin.
assert(1 == io_service.run_one());
// Write more data to the socket, allowing the async_read composed
// operation to complete.
boost::asio::write(socket1, boost::asio::buffer(data));
assert(1 == io_service.run());
}
When ran, it provides output similar to below:
@asio|1477939244.378393|0*1|[email protected]_connect // 1
@asio|1477939244.378925|>1|ec=system:0 // 2
@asio|1477939244.379056|<1| // 3
@asio|1477939244.379207|0*2|[email protected]_receive // 4
@asio|1477939244.379402|>2|ec=system:0,bytes_transferred=7 // 5
@asio|1477939244.379572|2*3|[email protected]_receive // 6
@asio|1477939244.379749|<2| // 7
@asio|1477939244.379874|>3|ec=system:0,bytes_transferred=1 // 8
@asio|1477939244.380063|<3| // 9
@asio|1477939244.380249|0|[email protected] // 10
@asio|1477939244.380456|0|[email protected] // 11
@asio|1477939244.380643|0|[email protected] // 12
It can be read line-by-line as:
socket1.async_connect()
, creating handler 1socket2.async_receive()
, creating handler 27
bytessocket2.async_receive()
, creating handler 3.1
bytesocket2
socket1
acceptor
Upvotes: 2
Reputation: 21058
If you're using async_read
to read a TCP stream, that internally sets up an internal hander on async_read_some
, which when it returns examines the data so far and/or the amount of data received, and either invokes your handler on completion or error, or invokes async_read_some
again.
I am surprised about a broken io_service, however, but that might depend on where you call reset. Actually, if you call reset()
while there are still handlers present and after the captured references within the lambdas go out of scope, then you might invoke UB.
Upvotes: 2