Reputation: 1434
I have got Worker
classes and a Handler
class to create an abstraction layer for jobs already. I wanted to use std::async
to pour some asynchrony into the mix but I got some weird behaviour from my Visual Studio 2012 (update 1).
My class hierarchy is as follows:
Worker
is an abstract class with Init
and Work
as pure virtual methods.BasicWorker : Worker
is simply using printf
for some output.GroupWorker : Worker
is an aggregation of other workers. Handler
holds on to a Worker
to do some job.Then I call several std::async
methods in which I create workers and a handler, I call for handler in a nested std::async
call and, I wait for initialization (std::condition_variable
here) of the worker and then I stop the handler.
In the end I wait for all of the std::future
s to finish.
The code is as follows:
#include <stdio.h>
#include <future>
#include <array>
#include <atomic>
#include <vector>
struct Worker
{
virtual ~Worker() { }
virtual void Init() = 0;
virtual void Work() = 0;
};
struct BasicWorker : public Worker
{
virtual ~BasicWorker() { }
virtual void Init()
{
printf("\t\t\t\tInit: %d\n", std::this_thread::get_id());
}
virtual void Work()
{
printf("\t\t\t\tWork: %d\n", std::this_thread::get_id());
}
};
struct GroupWorker : public Worker
{
GroupWorker()
{
workers.push_back(std::make_shared<BasicWorker>());
}
virtual ~GroupWorker() { }
virtual void Init()
{
for(int i = 0; i < workers.size(); ++i)
{
workers[i]->Init();
}
initEvent.notify_all();
}
virtual void Work()
{
for(int i = 0; i < workers.size(); ++i)
{
workers[i]->Work();
}
}
void WaitForInit()
{
//std::unique_lock<std::mutex> initLock(initMutex);
//initEvent.wait(initLock);
}
private:
std::mutex initMutex;
std::condition_variable initEvent;
std::vector<std::shared_ptr<Worker>> workers;
};
struct Handler
{
static const int Stopped = -1;
static const int Ready = 0;
static const int Running = 1;
Handler(const std::shared_ptr<Worker>& worker) :
worker(worker)
{ }
void Start(int count)
{
int readyValue = Ready;
if(working.compare_exchange_strong(readyValue, Running))
{
worker->Init();
for(int i = 0; i < count && working == Running; ++i)
{
worker->Work();
}
}
}
void Stop()
{
working = Stopped;
}
private:
std::atomic<int> working;
std::shared_ptr<Worker> worker;
};
std::future<void> Start(int jobIndex, int runCount)
{
//printf("Start: %d\n", jobIndex);
return std::async(std::launch::async, [=]()
{
printf("Async: %d\n", jobIndex);
auto worker = std::make_shared<GroupWorker>();
auto handler = std::make_shared<Handler>(worker);
auto result = std::async(std::launch:async, [=]()
{
printf("Nested async: %d\n", jobIndex);
handler->Start(runCount);
});
worker->WaitForInit();
handler->Stop();
result.get();
});
}
int main()
{
const int JobCount = 300;
const int RunCount = 5;
std::array<std::future<void>, JobCount> jobs;
for(int i = 0; i < JobCount; ++i)
{
jobs[i] = Start(i, RunCount);
}
for(int i = 0; i < JobCount; ++i)
{
jobs[i].get();
}
}
My problem is:
WaitForInit@GroupWorker
function then my nested asynchronous function calls are not made until all first level asynchronous function calls are madestd::condition_variable
if I increase the number of jobs, creation of new threads feels like getting exponentially slow. For my trial below 100 jobs there exist some asynchrony but above 300 it is completely sequential to create jobs.printf
line in Start
method, all nested asynchrony works like a charmSo,
std::condition_variable
?printf
to do with any of this? (I tried deleting all printf
calls in the case of a race condition and I put a breakpoint in the code but no help. It is the same case with std::cout
too)Edit:
I have added launch policy (as suggested by Jonathan Wakely) for assuring the creation of a thread. But that did not help either. I am currently creating a std::thread
and calling thread::join
function for waiting inside the first level async.
Upvotes: 2
Views: 1654
Reputation: 171383
N.B. it's OK to call printf
, but not to assume std::thread::id
is convertible to int
. You could make that slightly more portable like so:
inline long tol(std::thread::id id)
{
std::ostringstream ss;
ss << id;
return stol(ss.str());
}
(This still assumes the string value of std::thread::id
can be converted to long
, which isn't required, but is more likely than assuming an implicit conversion to int
)
What am I doing wrong in usage of std::condition_variable?
You have no "condition" that you're waiting for and no synchronisation to ensure the call to notify_all
happens before the calls to wait
. You should have a member variable that says "this worker has been init'd" which is set by Init
, and only wait on the condition variable if it's not true (that flag should be atomic or guarded by a mutex, to prevent data races).
Why creating jobs gets slower for like 100s of threads? (this question is optional, seems like a problem of OS and can be fixed with a smart thread-pool concept)
Because with hundreds of threads there is a lot of contention for shared resources and a lot of pressure on the OS scheduler, so the implementation probably decides to start returning deferred functions (i.e. as though std::async
was called with std::launch::deferred
) instead of async ones. Your code assumes async
will not return deferred functions, because if an async worker and its nested async worker are both run as deferred functions the program could deadlock, because the outer function blocks waiting for the nested one to call Init
but the nested function never runs until the out one calls result.get()
. Your program is not portable and only works on Windows because (if I understand correctly) the MSVC async
uses a work-stealing thread pool which will run a deferred function if a thread becomes available for it. This is not required by the standard. If you want to force each worker to have a new thread, use the std::launch::async
policy.
What has printf to do with any of this? (I tried deleting all printf calls in the case of a race condition and I put a breakpoint in the code but no help. It is the same case with std::cout too)
It imposes a slight delay, and probably some form of unreliable ordering between threads since they are now contending, and probably racing, for a single global resource. The delay imposed by the printf
might be enough for one thread to finish, which releases its resources to the thread pool and allows another async worker to run.
Upvotes: 1