ZFY
ZFY

Reputation: 135

boost asio deadline_timer async_wait(N seconds) twice within N seconds cause operation canceled

What I want is when one message queue receives an int N, the handler function will be called after N seconds. below is my code.

It runs OK if the duration seconds of two near message queue is larger than the int N, but the handler will print "Operation canceled" in one handler when the duration seconds between two received message queues are smaller than N, which is not what I want.

I'd appreciate a lot for any help.

#include <boost/asio.hpp>
#include <zmq.h>
#include <boost/thread.hpp>
#include <iostream>

boost::asio::io_service io_service;

void* context = zmq_ctx_new();
void* sock_pull = zmq_socket(context, ZMQ_PULL);


void handler(const boost::system::error_code &ec) {
    std::cout << "hello, world" << "\t" << ec.message() << std::endl;
}

void run() {
    io_service.run();
}

void thread_listener() {

     int nRecv;
     boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(0));
     while( true ) {
         zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0);
         std::cout << nRecv << std::endl;
         timer.expires_from_now(boost::posix_time::seconds(nRecv));
         timer.async_wait(handler);
     }

 }

 int main(int argc, char* argv[]) {

     boost::asio::io_service::work work(io_service);

     zmq_bind(sock_pull, "tcp://*:60000");
     boost::thread tThread(thread_listener);
     boost::thread tThreadRun(run);
     tThread.join();
     tThreadRun.join();
     return 0;

 }

Upvotes: 2

Views: 2840

Answers (1)

sehe
sehe

Reputation: 393239

When you call

timer.expires_from_now(boost::posix_time::seconds(nRecv));

this, as the documentation states, cancels any async timer pending.

If you want to have overlapping requests in flight at a given time, one timer is clearly not enough. Luckily there is a wellknown pattern around bound shared pointers in Asio that you can use to mimick a "session" per response.

Say you define a session to contain it's own private timer:

struct session : boost::enable_shared_from_this<session> {
    session(boost::asio::io_service& svc, int N) :
        timer(svc, boost::posix_time::seconds(N)) 
    {
        // Note: shared_from_this is not allowed from ctor
    }

    void start() {
        // it's critical that the completion handler is bound to a shared
        // pointer so the handler keeps the session alive:
        timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error));
    }

  private:
    void handler(const boost::system::error_code &ec) {
        std::cout << "hello, world" << "\t" << ec.message() << std::endl;
    }

    boost::asio::deadline_timer timer;
};

Now, it's trivial to replace the code that used the hardcoded timer instance:

 timer.expires_from_now(boost::posix_time::seconds(nRecv));
 timer.async_wait(handler);

with the session start:

 boost::make_shared<session>(io_service, nRecv)->start();

A fully working example (with suitably stubbed ZMQ stuff): Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <iostream>

boost::asio::io_service io_service;

/////////////////////////////////////////////////////////////////////////
// I love stubbing out stuff I don't want to install just to help others
enum { ZMQ_PULL };
static void* zmq_ctx_new()         { return nullptr; }
static void* zmq_socket(void*,int) { return nullptr; }
static void  zmq_bind(void*,char const*) {}
static void  zmq_recv(void*,int*data,size_t,int) 
{ 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(rand()%1000));
    *data = 2;
}
// End of stubs :)
/////////////////////////////////////////////////////////////////////////

void* context  = zmq_ctx_new();
void* sock_pull = zmq_socket(context, ZMQ_PULL);

struct session : boost::enable_shared_from_this<session> {
    session(boost::asio::io_service& svc, int N) :
        timer(svc, boost::posix_time::seconds(N)) 
    {
        // Note: shared_from_this is not allowed from ctor
    }

    void start() {
        // it's critical that the completion handler is bound to a shared
        // pointer so the handler keeps the session alive:
        timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error));
    }

    ~session() {
        std::cout << "bye (session end)\n";
    }

  private:
    void handler(const boost::system::error_code &ec) {
        std::cout << "hello, world" << "\t" << ec.message() << std::endl;
    }

    boost::asio::deadline_timer timer;
};

void run() {
    io_service.run();
}

void thread_listener() {
    int nRecv = 0;
    for(int n=0; n<4; ++n) {
        zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0);
        std::cout << nRecv << std::endl;

        boost::make_shared<session>(io_service, nRecv)->start();
    }
}

int main() {
    auto work = boost::make_shared<boost::asio::io_service::work>(io_service);

    zmq_bind(sock_pull, "tcp://*:60000");
    boost::thread tThread(thread_listener);
    boost::thread tThreadRun(run);

    tThread.join();
    work.reset();

    tThreadRun.join();
}

Upvotes: 5

Related Questions