xiaodong
xiaodong

Reputation: 1012

boost::asio::io_service::run in more than one thread

I was trying to implement the ActiveObject using boost::asio::io_service, but the result is not exactly as what I expected:

Below is my codes:

#include    <boost/asio.hpp>
#include    <chrono>
#include    <boost/date_time/posix_time/posix_time.hpp>
#include    <memory>
#include    <thread>
#include    <string>
#include    <functional>
#include    <ctime>
#include    <chrono>

#define SIZE 10 

class ActiveObject
{
  public:
  ActiveObject()
  {
    executionThread_.reset( new std::thread(    [&]{  service_.run();  }  ) );
  }

    virtual ~ActiveObject()
    {
      // execute all unfinished work in case this object is leaving scope.
      service_.poll();
      service_.stop();
      executionThread_->join();
      std::cout << "active object thread exited" << std::endl;
    }

    void doSomething()
    {
      // post request the io_service to invoke someImpl method and return immediately
      service_.post([=]{ someImpl();});
    }

  protected:
    boost::asio::io_service service_;

  private:
    std::shared_ptr<std::thread> executionThread_;

    void someImpl() {
      std::chrono::milliseconds dura( 200 );
      std::this_thread::sleep_for( dura );
      std::cout << "poll thread id: " << std::this_thread::get_id() << std::endl;
    }

};

int main()
{
  std::cout << "main thread id: " << std::this_thread::get_id() << std::endl;

  ActiveObject obj;

  for(int i=0; i < SIZE; ++i) {
    obj.doSomething(); // call is nonblocking
  }

  std::cout <<  "main thread exited " << std::endl;
  return 0;
}

What I want is to run the boost::asio::io_service::run on the same thread, but it turns out not. From the log printed, the run() is also runned on the main thread. Below is the log printed:

main thread id: 140070853244800
main thread exited 
poll thread id: 140070832256768
poll thread id: 140070853244800
poll thread id: 140070832256768
poll thread id: 140070853244800
poll thread id: 140070832256768
poll thread id: 140070853244800
poll thread id: 140070853244800
poll thread id: 140070832256768
poll thread id: 140070853244800
poll thread id: 140070832256768
active object thread exited

Any idea on this? Thanks

Upvotes: 1

Views: 971

Answers (1)

sehe
sehe

Reputation: 392833

At least where you do

service_.poll();
service_.stop();

you are actively making the service run on the active thread.

To mitigate, you'd say

service_.post([&] { service_.poll();});
service_.stop();

but then that poll() would be awful redundant. This appears to fix your problem:

HOWEVER there's a big problem;

Why do we even call stop()? run() just completes when all work is done, right? In fact, run() may complete immediately during/after construction. All work would be deferred to the destructor.

However, if run() completed, the service needed to be reset(), so no work might actually run (demo live on Coliru!!!) Oops. That's a nasty race condition.

To prevent run() from completing "prematurely", use a work item: Live On Coliru

#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <chrono>
#include <ctime>
#include <functional>
#include <memory>
#include <string>
#include <thread>

#define SIZE 10

class ActiveObject
{
public:
    ActiveObject()
        : service_(),
          work_(boost::asio::io_service::work(service_))
    {
        executionThread_.reset(new std::thread([&] { service_.run(); }));
    }

    virtual ~ActiveObject()
    {
        // execute all unfinished work in case this object is leaving scope.
        work_ = boost::none;
        executionThread_->join();
        std::cout << "active object thread exited" << std::endl;
    }

    void doSomething()
    {
        // post request the io_service to invoke someImpl method and return immediately
        service_.post([this] { someImpl();});
    }

protected:
    boost::asio::io_service service_;
    boost::optional<boost::asio::io_service::work> work_;

private:
    std::shared_ptr<std::thread> executionThread_;

    void someImpl()
    {
        std::chrono::milliseconds dura(200);
        std::this_thread::sleep_for(dura);
        std::cout << "poll thread id: " << std::this_thread::get_id() << std::endl;
    }

};

int main()
{
    std::cout << "main thread id: " << std::this_thread::get_id() << std::endl;

    ActiveObject obj;
    std::chrono::milliseconds dura(200);
    std::this_thread::sleep_for(dura);

    for(int i=0; i < SIZE; ++i)
    {
        obj.doSomething(); // call is nonblocking
    }

    std::this_thread::sleep_for(std::chrono::seconds(1));

    std::cout <<  "main thread exited " << std::endl;
}

Output:

main thread id: 140312603891520
poll thread id: 140312580617984
poll thread id: 140312580617984
poll thread id: 140312580617984
poll thread id: 140312580617984
main thread exited 
poll thread id: 140312580617984
poll thread id: 140312580617984
poll thread id: 140312580617984
poll thread id: 140312580617984
poll thread id: 140312580617984
poll thread id: 140312580617984
active object thread exited

Upvotes: 1

Related Questions