MSalters
MSalters

Reputation: 179809

What handlers does boost.Asio execute behind the scenes?

I've got a very simple Boost.Asio case: an async_read protected by a deadline_timer. I also have a std::atomic_bool DEBUG[2]. The async_read handler sets DEBUG[0]; the deadline_timer sets DEBUG[1]. This happens unconditionally, even if the error code is error::operation_aborted.

Now, when I call io_service::run_one() I usually see either one of the DEBUG indicators set. However, in at least 10% of the cases, run_one returns 1 yet none of the two indicators are set i.e. neither of the two handlers was called. (Also the other side effects of the handler are missing).

Now run_one is supposed to return the number of handlers executed, so when it returns 1 it must have executed a handler - but which handler, if not mine?

The reason I ask is because even after a .reset(), the io_service object is broken.

Relevant code - rather verbose to make the problem clear:

boost::asio::deadline_timer deadline(thread_io_service);
deadline.expires_from_now(boost::posix_time::seconds(timeoutSeconds));
read_counter += 2; // Initialized to 1 in ctor, so always odd.
// C++11: Cannot capture expressions such as this->read_counter.
unsigned read_counter_copy = read_counter;
read_timeout.store(0, std::memory_order_release); // 0 = no timeout.
deadline.async_wait([&, read_counter_copy](boost::system::error_code const&)
    {
        // read_counter_copy is very intentionally captured by value - this timeout applies only to the next read.
        read_timeout.store(read_counter_copy, std::memory_order_release);
        DEBUG[0] = true;
    }
);

// Start reading "asynchronously", wait for completion or timeout:
std::atomic<boost::system::error_code> ec(boost::asio::error::would_block);
size_t len = 0;

boost::asio::async_read(socket, boost::asio::buffer(buffer + byteShift), boost::asio::transfer_exactly(nrBytes),
    [&](boost::system::error_code const& err, size_t bytesTransferred)
{
    len = bytesTransferred;
    ec.store(err, std::memory_order_release);
    DEBUG[1] = true;
}
);

// We only have 5 states to deal with
enum { pending, timeout, read, read_then_timeout, timeout_then_read } state = pending;
for (;;)
{
    if      (state == read_then_timeout) assert(false); // unreachable - breaks directly
    else if (state == timeout_then_read) assert(false); // unreachable - breaks directly
    // [pending, read, timeout] i.e. only one handler has run yet.
    thread_io_service.run_one(); // Don't trust this - check the actual handlers and update state accordingly.
    if (state == pending && read_timeout.load(std::memory_order_acquire) == read_counter)
    {
        state = timeout;
        socket.cancel(); // This will cause the read handler to be called with ec=aborted
        continue;
    }
    if (state == read && read_timeout.load(std::memory_order_acquire) == read_counter)
    {
        state = read_then_timeout;
        break; // 
    }
    if (state == pending && ec.load(std::memory_order_acquire) != boost::asio::error::would_block)
    {
        state = read;
        deadline.cancel();
        continue;
    }
    if (state == timeout && ec.load(std::memory_order_acquire) != boost::asio::error::would_block)
    {
        state = timeout_then_read; // Might still be a succesfull read (race condition)
        break;
    }
    // This is the actual problem: neither read nor timeout. 
    // DEBUG == {false,false} when this happens.
    L_NET(warning) << "Boost.Asio spurious return";
}
assert(state == timeout_then_read || state == read_then_timeout);
thread_io_service.reset();

Upvotes: 1

Views: 862

Answers (2)

Tanner Sansbury
Tanner Sansbury

Reputation: 51881

The handler being invoked is an intermediate completion handler created as part of the async_read composed operation. Composed operations are implemented in zero of more terms of another operation, and each of these intermediate operations have completion handlers of their own. Furthermore, run_one() does not treat these intermediate completion handlers differently.

In the case where neither the async_timeout nor async_read completion handlers are invoke, but run_one() returns indicating a handler was ran, then the async_read operation is being composed of at least two async_read_some operations. This can occur if async_read() is initiated when the socket has some data available, but the first read did not satisfy the completion condition. For example, the socket may have some data, but not all of the desired data (e.g. 0 < socket.available() < buffer size).


One can enable the Handler Tracking to sometimes get better insight into which handlers are being invoked. When BOOST_ASIO_ENABLE_HANDLER_TRACKING is defined, Asio will write handler debugging output to standard error, including handler identifiers. Here is an example that demonstrates handler tracking where an async_read composed operation will be composed of at least two intermediate operations, and invoking io_service.run_one() invokes an intermediate completion handler:

#include <functional> // std::bind
#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>

const auto noop = std::bind([]{});

int main()
{
  using boost::asio::ip::tcp;

  // Create all I/O objects.
  boost::asio::io_service io_service;
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket socket1(io_service);
  tcp::socket socket2(io_service);

  // Connect the sockets.
  socket1.async_connect(acceptor.local_endpoint(), noop);
  acceptor.accept(socket2);
  io_service.run();
  io_service.reset();

  // Write data from socket1 to socket2.
  const std::string data = "example";
  boost::asio::write(socket1, boost::asio::buffer(data));

  // Initiate a composed async_read operation that attempts to
  // read more data than is immediately available.
  assert(socket2.available());  
  std::vector<char> buffer(socket2.available() + 1);
  boost::asio::async_read(socket2, boost::asio::buffer(buffer), noop);

  // Invoke completion handler for intermediate async_read_some
  // operatoin.
  assert(1 == io_service.run_one());

  // Write more data to the socket, allowing the async_read composed
  // operation to complete.
  boost::asio::write(socket1, boost::asio::buffer(data));
  assert(1 == io_service.run());
}

When ran, it provides output similar to below:

@asio|1477939244.378393|0*1|[email protected]_connect // 1
@asio|1477939244.378925|>1|ec=system:0                          // 2
@asio|1477939244.379056|<1|                                     // 3
@asio|1477939244.379207|0*2|[email protected]_receive // 4
@asio|1477939244.379402|>2|ec=system:0,bytes_transferred=7      // 5
@asio|1477939244.379572|2*3|[email protected]_receive // 6
@asio|1477939244.379749|<2|                                     // 7
@asio|1477939244.379874|>3|ec=system:0,bytes_transferred=1      // 8
@asio|1477939244.380063|<3|                                     // 9
@asio|1477939244.380249|0|[email protected]           // 10
@asio|1477939244.380456|0|[email protected]           // 11
@asio|1477939244.380643|0|[email protected]           // 12

It can be read line-by-line as:

  1. From outside a handler (0), invoke socket1.async_connect(), creating handler 1
  2. Enter handler 1 with success
  3. Exit handler 1
  4. From outside a handler (0), invoke socket2.async_receive(), creating handler 2
  5. Enter handler 2 with success and having read 7 bytes
  6. From inside of handler 2, invoke socket2.async_receive(), creating handler 3.
  7. Exit handler 2.
  8. Enter handler 3 with success and having read 1 byte
  9. Exit handler 3
  10. Close socket2
  11. Close socket1
  12. Close acceptor

Upvotes: 2

Dave S
Dave S

Reputation: 21058

If you're using async_read to read a TCP stream, that internally sets up an internal hander on async_read_some, which when it returns examines the data so far and/or the amount of data received, and either invokes your handler on completion or error, or invokes async_read_some again.

I am surprised about a broken io_service, however, but that might depend on where you call reset. Actually, if you call reset() while there are still handlers present and after the captured references within the lambdas go out of scope, then you might invoke UB.

Upvotes: 2

Related Questions