user3457135
user3457135

Reputation: 21

Boost Asio async_read sometimes hangs while reading but not always

I am implementing a small distributed system that consists N machines. Each of them receives some data from some remote server and then propagates the data to other n-1 fellow machines. I am using the Boost Asio async_read and async_write to implement this. I set up a test cluster of N=30 machines. When I tried smaller datesets (receiving 75KB to 750KB per machine), the program always worked. But when I moved on to just a slightly larger dataset (7.5MB), I observed strange behavior: at the beginning, reads and writes happened as expected, but after a while, some machines hanged while others finished, the number of machines that hanged varied with each run. I tried to print out some messages in each handler and found that for those machines that hanged, async_read basically could not successfully read after a while, therefore nothing could proceed afterwards. I checked the remote servers, and they all finished writing. I have tried out using strand to control the order of execution of async reads and writes, and I also tried using different io_services for read and write. None of them solved the problem. I am pretty desperate. Can anyone help me?

Here is the code for the class that does the read and propagation:

const int TRANS_TUPLE_SIZE=15;
const int TRANS_BUFFER_SIZE=5120/TRANS_TUPLE_SIZE*TRANS_TUPLE_SIZE;
class Asio_Trans_Broadcaster
{
private:
   char buffer[TRANS_BUFFER_SIZE];
   int node_id;
   int mpi_size;
   int mpi_rank;
   boost::asio::ip::tcp::socket* dbsocket;
   boost::asio::ip::tcp::socket** sender_sockets;
   int n_send;
   boost::mutex mutex;
   bool done;
public:
   Asio_Trans_Broadcaster(boost::asio::ip::tcp::socket* dbskt, boost::asio::ip::tcp::socket** senderskts,
        int msize, int mrank, int id)
{
    dbsocket=dbskt;
    count=0;
    node_id=id;
    mpi_size=mpi_rank=-1;
    sender_sockets=senderskts;
    mpi_size=msize;
    mpi_rank=mrank;
    n_send=-1;
    done=false;
}

static std::size_t completion_condition(const boost::system::error_code& error, std::size_t bytes_transferred)
{
    int remain=bytes_transferred%TRANS_TUPLE_SIZE;
    if(remain==0 && bytes_transferred>0)
        return 0;
    else
        return TRANS_BUFFER_SIZE-bytes_transferred;
}


void write_handler(const boost::system::error_code &ec, std::size_t bytes_transferred)
{
    int n=-1;
    mutex.lock();
    n_send--;
    n=n_send;
    mutex.unlock();
    fprintf(stdout, "~~~~~~ @%d, write_handler: %d bytes, copies_to_send: %d\n",
                                    node_id, bytes_transferred, n);
    if(n==0 && !done)
        boost::asio::async_read(*dbsocket,
            boost::asio::buffer(buffer, TRANS_BUFFER_SIZE),
            Asio_Trans_Broadcaster::completion_condition, boost::bind(&Asio_Trans_Broadcaster::broadcast_handler, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
}

void broadcast_handler(const boost::system::error_code &ec, std::size_t bytes_transferred)
{
    fprintf(stdout, "@%d, broadcast_handler: %d bytes, mpi_size:%d, mpi_rank: %d\n", node_id, bytes_transferred, mpi_size, mpi_rank);
    if (!ec)
    {
        int pos=0;
        while(pos<bytes_transferred && pos<TRANS_BUFFER_SIZE)
        {
            int id=-1;
            memcpy(&id, &buffer[pos], 4);
            if(id<0)
            {
                done=true;
                fprintf(stdout, "@%d, broadcast_handler: done!\n", mpi_rank);
                break;
            }

            pos+=TRANS_TUPLE_SIZE;
        }

        mutex.lock();
        n_send=mpi_size-1;
        mutex.unlock();
        for(int i=0; i<mpi_size; i++)
            if(i!=mpi_rank)
            {
                boost::asio::async_write(*sender_sockets[i], boost::asio::buffer(buffer, bytes_transferred),
                                boost::bind(&Asio_Trans_Broadcaster::write_handler, this,
                                boost::asio::placeholders::error,
                                boost::asio::placeholders::bytes_transferred));
            }
    }
    else
    {
        cerr<<mpi_rank<<" error: "<<ec.message()<<endl;
      delete this;
    }


}

void broadcast()
{
    boost::asio::async_read(*dbsocket,
            boost::asio::buffer(buffer, TRANS_BUFFER_SIZE),
            Asio_Trans_Broadcaster::completion_condition, boost::bind(&Asio_Trans_Broadcaster::broadcast_handler, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
}
};

Here is the main code running on each machine:

int N=30;
boost::asio::io_service* sender_io_service=new boost::asio::io_service();
boost::asio::io_service::work* p_work=new boost::asio::io_service::work(*sender_io_service);
boost::thread_group send_thread_pool;
for(int i=0; i<NUM_THREADS; i++)
{
    send_thread_pool.create_thread( boost::bind( & boost::asio::io_service::run, sender_io_service ) );
}

boost::asio::io_service* receiver_io_service=new boost::asio::io_service();
shared_ptr<boost::asio::io_service::work> p_work2(new boost::asio::io_service::work(*receiver_io_service));
boost::thread_group thread_pool2;
thread_pool2.create_thread( boost::bind( & boost::asio::io_service::run, receiver_io_service) );

boost::asio::ip::tcp::socket* receiver_socket;
    //establish nonblocking connection with remote server
AsioConnectToRemote(5000, 1, receiver_io_service, receiver_socket, true);

boost::asio::ip::tcp::socket* send_sockets[N];
    //establish blocking connection with other machines
hadoopNodes = SetupAsioConnectionsWIthOthers(sender_io_service, send_sockets, hostFileName, mpi_rank, mpi_size, 3000, false);

Asio_Trans_Broadcaster* db_receiver=new Asio_Trans_Broadcaster(receiver_socket, send_sockets,
mpi_size,  mpi_rank, mpi_rank);

db_receiver->broadcast();
  p_work2.reset();
  thread_pool2.join_all();
  delete p_work;
send_thread_pool.join_all();

Upvotes: 2

Views: 1937

Answers (1)

sehe
sehe

Reputation: 394054

I don't know what your code is trying to achieve. There are too many missing bits.

Of course, if the task is to asynchronously send/receive traffic on network sockets, Asio is just the thing for that. It's hard to see what's special about your code.

I'd suggest to clean up the more obvious problems:

  • there's (almost) no error handling (check your error_code-s!)
  • unless you're on a funny platform, your format strings should use %lu for size_t
  • why do you mess around with raw arrays, with possibly bad sizes, when you can just have a vector?
  • never assume the size of objects if you can use sizeof:

    memcpy(&id, &trans_buffer[pos], sizeof(id));
    
  • come to think of it, it looks like the indexing of buffer is unsafe anyways:

        while(pos < bytes_transferred && pos < TRANS_BUFFER_SIZE)
        {
            int id = -1;
            memcpy(&id, &buffer[pos], sizeof(id));
    

    If e.g. pos == TRANS_BUFFER_SIZE-1 here the memcpy invokes Undefined Behavour...

  • why is there so much new going on? You're inviting a hairy class of bugs into your code. As if memory management wasn't the achilles heel of lowlevel coding. Use values, or shared pointers. Never delete this. Ever[1]

  • why is there so much repeated code? Why is one thread pool named after sender and the other thread_pool2? Which contains 1 thread. Eh? Why do you have one work item as a raw pointer, the other as a shared_ptr?

    You could just just:

    struct service_wrap {
        service_wrap(int threads) {
            while(threads--)
                pool.create_thread(boost::bind(&boost::asio::io_service::run, boost::ref(io_service)));
        }
    
        ~service_wrap() {
            io_service.post(boost::bind(&service_wrap::stop, this));
            pool.join_all();
        }
    
    private: // mind the initialization order!
        boost::asio::io_service io_service;
        boost::optional<boost::asio::io_service::work> work;
        boost::thread_group pool;
    
        void stop() { 
            work = boost::none;
        }
    };
    

    So you can simply write:

    service_wrap senders(NUM_THREADS);
    service_wrap receivers(1);
    

    Wow. Did you see that? No more chance of error. If you fix one pool, you fix the other automatically. No more delete the first, .reset() the second work item. In short: no more messy code, and less complexity.

  • Use exception safe locking guards:

    int local_n_send = -1; // not clear naming
    {
        boost::lock_guard<boost::mutex> lk(mutex);
        n_send--;
        local_n_send = n_send;
    }
    
  • the body of broadcast is completely repeated in write_handler(). Why not just call it:

    if(local_n_send == 0 && !done)
        broadcast();
    
  • I think there's still a race condition - not a data race on the access to n_send itself, but the decision to re-broadcast might be wrong if n_send reaches zero after the the lock is released. Now, since broadcast() does only an async operation, you can just do it under the lock and get rid of the race condition:

    void write_handler(const error_code &ec, size_t bytes_transferred) {
        boost::lock_guard<boost::mutex> lk(mutex);
    
        if(!(done || --n_send))
            broadcast();
    }
    

    Woop woop. That's three lines of code now. Less code is less bugs.

My guess would be that if you diligently scrub the code like this, you will inevitably find your clues. Think of it like you would look for a lost wedding-ring: you wouldn't leave a mess lying around. Instead, you'd go from room to room and tidy it all up. Throw everything "out" first if need be.

Iff you can make this thing self-contained /and/ reproducible, I'll even debug it further for you!

Cheers

Here's a starting point that I made while looking at the code: Compiling on Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/array.hpp>
#include <boost/make_shared.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <iostream>

const/*expr*/ int TRANS_TUPLE_SIZE  = 15;
const/*expr*/ int TRANS_BUFFER_SIZE = 5120 / TRANS_TUPLE_SIZE * TRANS_TUPLE_SIZE;

namespace AsioTrans
{
    using boost::system::error_code;
    using namespace boost::asio;

    typedef ip::tcp::socket             socket_t;
    typedef boost::ptr_vector<socket_t> socket_list;

    class Broadcaster
    {
    private:
        boost::array<char, TRANS_BUFFER_SIZE> trans_buffer;

        int node_id;
        int mpi_rank;

        socket_t&    dbsocket;
        socket_list& sender_sockets;

        int n_send;
        boost::mutex mutex;
        bool done;
    public:
        Broadcaster(
            socket_t& dbskt,
            socket_list& senderskts,
            int mrank,
            int id) : 
                node_id(id),
                mpi_rank(mrank),
                dbsocket(dbskt),
                sender_sockets(senderskts),
                n_send(-1),
                done(false)
        {
            // count=0;
        }

        static size_t completion_condition(const error_code& error, size_t bytes_transferred)
        {
            // TODO FIXME handler error_code here
            int remain = bytes_transferred % TRANS_TUPLE_SIZE;

            if(bytes_transferred && !remain)
            {
                return 0;
            }
            else
            {
                return TRANS_BUFFER_SIZE - bytes_transferred;
            }
        }

        void write_handler(const error_code &ec, size_t bytes_transferred)
        {
            // TODO handle errors
            // TODO check bytes_transferred
            boost::lock_guard<boost::mutex> lk(mutex);

            if(!(done || --n_send))
                broadcast();
        }

        void broadcast_handler(const error_code &ec, size_t bytes_transferred)
        {
            fprintf(stdout, "@%d, broadcast_handler: %lu bytes, mpi_size:%lu, mpi_rank: %d\n", node_id, bytes_transferred, sender_sockets.size(), mpi_rank);

            if(!ec)
            {
                for(size_t pos = 0; (pos < bytes_transferred && pos < TRANS_BUFFER_SIZE); pos += TRANS_TUPLE_SIZE)
                {
                    int id = -1;
                    memcpy(&id, &trans_buffer[pos], sizeof(id));

                    if(id < 0)
                    {
                        done = true;
                        fprintf(stdout, "@%d, broadcast_handler: done!\n", mpi_rank);
                        break;
                    }
                }

                {
                    boost::lock_guard<boost::mutex> lk(mutex);
                    n_send = sender_sockets.size() - 1;
                }

                for(int i = 0; size_t(i) < sender_sockets.size(); i++)
                {
                    if(i != mpi_rank)
                    {
                        async_write(
                                sender_sockets[i], 
                                buffer(trans_buffer, bytes_transferred),
                                boost::bind(&Broadcaster::write_handler, this, placeholders::error, placeholders::bytes_transferred));
                    }
                }
            }
            else
            {
                std::cerr << mpi_rank << " error: " << ec.message() << std::endl;
                delete this;
            }
        }

        void broadcast()
        {
            async_read(
                    dbsocket,
                    buffer(trans_buffer),
                    Broadcaster::completion_condition, 
                    boost::bind(&Broadcaster::broadcast_handler, this,
                        placeholders::error,
                        placeholders::bytes_transferred));
        }
    };

    struct service_wrap {
        service_wrap(int threads) {
            while(threads--)
                _pool.create_thread(boost::bind(&io_service::run, boost::ref(_service)));
        }

        ~service_wrap() {
            _service.post(boost::bind(&service_wrap::stop, this));
            _pool.join_all();
        }

        io_service& service() { return _service; }

    private: // mind the initialization order!
        io_service                        _service;
        boost::optional<io_service::work> _work;
        boost::thread_group               _pool;

        void stop() { 
            _work = boost::none;
        }
    };

    extern void AsioConnectToRemote(int, int, io_service&, socket_t&, bool);
    extern void SetupAsioConnectionsWIthOthers(io_service&, socket_list&, std::string, int, bool);
}

int main()
{
    using namespace AsioTrans;

    // there's no use in increasing #threads unless there are blocking operations
    service_wrap senders(boost::thread::hardware_concurrency()); 
    service_wrap receivers(1);

    socket_t receiver_socket(receivers.service());
    AsioConnectToRemote(5000, 1, receivers.service(), receiver_socket, true);

    socket_list send_sockets(30);
    /*hadoopNodes =*/ SetupAsioConnectionsWIthOthers(senders.service(), send_sockets, "hostFileName", 3000, false);

    int mpi_rank = send_sockets.size();
    AsioTrans::Broadcaster db_receiver(receiver_socket, send_sockets, mpi_rank, mpi_rank);
    db_receiver.broadcast();
}

[1] No exceptions. Except when there's an exception to the no-exceptions rule. Exception-ception.

Upvotes: 2

Related Questions