gexicide
gexicide

Reputation: 40048

How to stop boost::asio async reads from getting mixed up?

I am using boost::asio::ip::tcp::socket to receive data. I need an interface which allows me to specify a buffer and call a completion handler once this buffer is filled asynchronously.

When reading from sockets, we can use the async_read_some method. However, the async_read_some method may read less than the requested number of bytes, so it must call itself with the rest of the buffer if this happens. Here is my current approach:

template<typename CompletionHandler>
void read(boost::asio::ip::tcp::socket* sock, char* target, size_t size, CompletionHandler completionHandler){

   struct ReadHandler {
      boost::asio::ip::tcp::socket* sock;
      char* target;
      size_t size;
      CompletionHandler completionHandler;

      ReadHandler(ip::tcp::socket* sock, char* target, size_t size, CompletionHandler completionHandler)
      : sock(sock),target(target),size(size),completionHandler(completionHandler){}
      // either request the remaining bytes or call the completion handler
      void operator()(const boost::system::error_code& error, std::size_t bytes_transferred){
         if(error){
            return;
         }

        if(bytes_transferred  < size){
           // Read incomplete
           char* newTarg =target+bytes_transferred;
           size_t newSize = size-bytes_transferred;
           sock->async_read_some(boost::asio::buffer(newTarg, newSize), ReadHandler(sock,newTarg,newSize,completionHandler));
           return;
        } else {
           // Read complete, call handler
           completionHandler();
        }
      }
   };

   // start first read
   sock->async_read_some(boost::asio::buffer(target, size), ReadHandler(this,target,size,completionHandler));
}

So basically, we call async_read_some until the whole buffer is filled, then we call the completion handler. So far so good. However, I think that things get mixed up once I call this method more than once before the first call finishes a receive:

void thisMayFail(boost::asio::ip::tcp::socket* sock){
   char* buffer1 = new char[128];
   char* buffer2 = new char[128];
   read(sock, buffer1, 128,[](){std::cout << "Buffer 1 filled";});
   read(sock, buffer2, 128,[](){std::cout << "Buffer 2 filled";});
}

of course, the first 128 received bytes should go into the first buffer and the second 128 should go into the second. But in my understanding, it may be the case that this does not happen here:

Suppose the first async_read_some returns only 70 bytes, then it would issue a second async_read_some with the remaining 58 bytes. However, this read will be queued behind the second 128 byte read(!), so the first buffer will receive the first 70 bytes, the next 128 will go into the second buffer and the final 50 go into the first. I.e., in this case the second buffer would even be filled before the first is filled completely. This may not happen.

How to solve this? I know there is the async_read method, but its documentation says it is simply implemented by calling async_read_some multiple times, so it is basically the same as my read implementation and will not fix the problem.

Upvotes: 1

Views: 1215

Answers (1)

sehe
sehe

Reputation: 392833

You simply can't have two asynchronous read operations active at the same time: that's undefined behaviour.

You can

  • use the free function async_read_until or async_read functions, which already have the higher-level semantics and loop callling the socket's async_read_some until a condition is matched or the buffer is full.

  • use asynchronous operation chaining to sequence the next async read after the first. In short, you initiate the second boost::asio::async_read* call in the completion handler of the first.

    Note:

    • Gives you the opportunity to act on transport errors first too.
    • together the free function interface will both raise the abstraction level of the code and solve the problem (the problem was initiating two simultaneous read operations)
  • use a strand in case you run multiple IO service threads; See Why do I need strand per connection when using boost::asio?

Upvotes: 1

Related Questions