zahir
zahir

Reputation: 1434

Using nested std::async with std::condition_variable in Visual Studio 2012

I have got Worker classes and a Handler class to create an abstraction layer for jobs already. I wanted to use std::async to pour some asynchrony into the mix but I got some weird behaviour from my Visual Studio 2012 (update 1).

My class hierarchy is as follows:

Then I call several std::async methods in which I create workers and a handler, I call for handler in a nested std::async call and, I wait for initialization (std::condition_variable here) of the worker and then I stop the handler.

In the end I wait for all of the std::futures to finish.

The code is as follows:

#include <stdio.h>
#include <future>
#include <array>
#include <atomic>
#include <vector>

struct Worker
{
    virtual ~Worker() { }
    virtual void Init() = 0;
    virtual void Work() = 0;
};

struct BasicWorker : public Worker
{
    virtual ~BasicWorker() { }
    virtual void Init()
    {
        printf("\t\t\t\tInit: %d\n", std::this_thread::get_id());
    }

    virtual void Work()
    {
        printf("\t\t\t\tWork: %d\n", std::this_thread::get_id());
    }
};

struct GroupWorker : public Worker
{
    GroupWorker()
    {
        workers.push_back(std::make_shared<BasicWorker>());
    }

    virtual ~GroupWorker() { }

    virtual void Init()
    {
        for(int i = 0; i < workers.size(); ++i)
        {
            workers[i]->Init();
        }
        initEvent.notify_all();
    }

    virtual void Work()
    {
        for(int i = 0; i < workers.size(); ++i)
        {
            workers[i]->Work();
        }
    }

    void WaitForInit()
    {
        //std::unique_lock<std::mutex> initLock(initMutex);
        //initEvent.wait(initLock);
    }
private:
    std::mutex initMutex;
    std::condition_variable initEvent;
    std::vector<std::shared_ptr<Worker>> workers;
};

struct Handler
{
    static const int Stopped = -1;
    static const int Ready = 0;
    static const int Running = 1;

    Handler(const std::shared_ptr<Worker>& worker) :
        worker(worker)
    { }

    void Start(int count)
    {
        int readyValue = Ready;
        if(working.compare_exchange_strong(readyValue, Running))
        {
            worker->Init();

            for(int i = 0; i < count && working == Running; ++i)
            {
                worker->Work();
            }
        }
    }

    void Stop()
    {
        working = Stopped;
    }
private:
    std::atomic<int> working;
    std::shared_ptr<Worker> worker;
};

std::future<void> Start(int jobIndex, int runCount)
{
    //printf("Start: %d\n", jobIndex);
    return std::async(std::launch::async, [=]()
    {
        printf("Async: %d\n", jobIndex);
        auto worker = std::make_shared<GroupWorker>();
        auto handler = std::make_shared<Handler>(worker);

        auto result = std::async(std::launch:async, [=]()
        {
            printf("Nested async: %d\n", jobIndex);
            handler->Start(runCount);
        });

        worker->WaitForInit();
        handler->Stop();

        result.get();
    });
}

int main()
{
    const int JobCount = 300;
    const int RunCount =  5;
    std::array<std::future<void>, JobCount> jobs;

    for(int i = 0; i < JobCount; ++i)
    {
        jobs[i] = Start(i, RunCount);
    }

    for(int i = 0; i < JobCount; ++i)
    {
        jobs[i].get();
    }
}

My problem is:

So,

Edit: I have added launch policy (as suggested by Jonathan Wakely) for assuring the creation of a thread. But that did not help either. I am currently creating a std::thread and calling thread::join function for waiting inside the first level async.

Upvotes: 2

Views: 1654

Answers (1)

Jonathan Wakely
Jonathan Wakely

Reputation: 171383

N.B. it's OK to call printf, but not to assume std::thread::id is convertible to int. You could make that slightly more portable like so:

inline long tol(std::thread::id id)
{
  std::ostringstream ss;
  ss << id;
  return stol(ss.str());
}

(This still assumes the string value of std::thread::id can be converted to long, which isn't required, but is more likely than assuming an implicit conversion to int)

What am I doing wrong in usage of std::condition_variable?

You have no "condition" that you're waiting for and no synchronisation to ensure the call to notify_all happens before the calls to wait. You should have a member variable that says "this worker has been init'd" which is set by Init, and only wait on the condition variable if it's not true (that flag should be atomic or guarded by a mutex, to prevent data races).

Why creating jobs gets slower for like 100s of threads? (this question is optional, seems like a problem of OS and can be fixed with a smart thread-pool concept)

Because with hundreds of threads there is a lot of contention for shared resources and a lot of pressure on the OS scheduler, so the implementation probably decides to start returning deferred functions (i.e. as though std::async was called with std::launch::deferred) instead of async ones. Your code assumes async will not return deferred functions, because if an async worker and its nested async worker are both run as deferred functions the program could deadlock, because the outer function blocks waiting for the nested one to call Init but the nested function never runs until the out one calls result.get(). Your program is not portable and only works on Windows because (if I understand correctly) the MSVC async uses a work-stealing thread pool which will run a deferred function if a thread becomes available for it. This is not required by the standard. If you want to force each worker to have a new thread, use the std::launch::async policy.

What has printf to do with any of this? (I tried deleting all printf calls in the case of a race condition and I put a breakpoint in the code but no help. It is the same case with std::cout too)

It imposes a slight delay, and probably some form of unreliable ordering between threads since they are now contending, and probably racing, for a single global resource. The delay imposed by the printf might be enough for one thread to finish, which releases its resources to the thread pool and allows another async worker to run.

Upvotes: 1

Related Questions