Reputation: 31
I am implementing a thread pool where workers sleep when there is no work ready, and the main thread sleeps when workers are busy.
I noticed the worker threads are proceeding to work after calling wait()
even though the main thread did not notify_all()
.
The output looks something like below:
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
......
Worker function:
void TaskSystemParallelThreadPoolSleeping::waitFunc() {
std::unique_lock<std::mutex> lock(*this->mutex_);
while(true) {
this->num_wait++;
std::cout << "WORKER WAIT" << std::endl;
this->cond_->wait(lock,
std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
std::cout << "WORKER AWAKENS!" << std::endl;
if (this->done_flag == true) {
this->mutex_->unlock();
break;
}
this->mutex_->unlock();
std::cout << "WORKER START" << std::endl;
while (true) {
this->mutex_->lock();
if (this->not_done == 0) { // ALL work done
if (this->total_work != 0) { // 1st time seen by workers
this->total_work = 0;
this->num_wait = 0;
std::cout << "WORKER WAKE MAIN" << std::endl;
this->mutex_->unlock();
this->cond_->notify_all();
}
this->mutex_->unlock();
break;
}
int total = this->total_work;
int id = this->work_counter;
if (id == total) { // NO work initiated or NO work left
this->mutex_->unlock();
continue;
}
++(this->work_counter); // increment counter
this->mutex_->unlock(); // Let others access counters to work
this->runnable->runTask(id, total); // do work
this->mutex_->lock();
--(this->not_done); // decrement counter after work done
this->mutex_->unlock();
}
std::cout << "WORKER DONE" << std::endl;
}
std::cout << "WORKER TERMINATE" << std::endl;
}
Main thread:
void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
//
// TODO: CS149 students will modify the implementation of this
// method in Part A. The implementation provided below runs all
// tasks sequentially on the calling thread.
// Set-up work
this->mutex_->lock();
std::cout << "MAIN SETUP" << std::endl;
this->runnable = runnable;
this->work_counter = 0;
this->not_done = num_total_tasks;
this->total_work = num_total_tasks;
// Tell workers there is work
std::cout << "MAIN POLLS READINESS" << std::endl;
while (this->num_wait < this->num_T) { // Check if all ready
this->mutex_->unlock();
this->mutex_->lock();
}
std::cout << "ALL WORKERS READY" << std::endl;
this->mutex_->unlock();
this->cond_->notify_all();
// Wait for workers to complete work
std::unique_lock<std::mutex> lock(*this->mutex_);
this->cond_->wait(lock,
std::bind(&TaskSystemParallelThreadPoolSleeping::wakeMain, this));
std::cout << "MAIN END" << std::endl;
}
Condition to wake worker:
bool TaskSystemParallelThreadPoolSleeping::wakeWorker() {
return (this->done_flag == true ||
(this->total_work != 0 && this->num_wait == this->num_T));
}
Condition to wake main thread:
bool TaskSystemParallelThreadPoolSleeping::wakeMain() {
return this->total_work == 0;
}
Thread Pool Constructor:
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
//
// TODO: CS149 student implementations may decide to perform setup
// operations (such as thread pool construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
this->num_T = std::max(1, num_threads - 1);
this->threads = new std::thread[this->num_T];
this->mutex_ = new std::mutex();
this->cond_ = new std::condition_variable();
this->total_work = 0;
this->not_done = 0;
this->work_counter = 0;
this->num_wait = 0;
this->done_flag = {false};
for (int i = 0; i < this->num_T; i++) {
this->threads[i] = std::thread(&TaskSystemParallelThreadPoolSleeping::waitFunc, this);
}
}
Thread pool destructor:
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
this->done_flag = true;
this->cond_->notify_all();
for (int i = 0; i < this->num_T; i++) {
this->threads[i].join();
}
delete this->mutex_;
delete[] this->threads;
delete this->cond_;
}
I think the beginning should be:
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
......
Ie. workers should only awaken after main's notify_all()
EDIT:
Here's the full log. It seems this self-awakening of workers causes a deadlock later, where one of the workers awakens by itself and does all the work, setting this->num_wait=0
and this->total_work=0
. Therefore all the threads only see this->num_wait=1
.
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
MAIN ENDWORKER DONE
WORKER WAIT
WORKER DONE
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER AWAKENS!
WORKER START
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
WORKER DONEMAIN END
MAIN SETUP
MAIN POLLS READINESS
WORKER DONE
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONEWORKER DONE
WORKER WAIT
WORKER WAIT
WORKER DONE
WORKER WAIT
MAIN END
MAIN SETUP
MAIN POLLS READINESS
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
WORKER DONEWORKER DONE
WORKER WAIT
MAIN END
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
ALL WORKERS READY
MAIN END
MAIN SETUP
MAIN POLLS READINESS
Upvotes: 0
Views: 228
Reputation: 388
The reason for "worker thread awakens without notify from main thread" is pretty self-evident: the increment of this->num_wait and this->cond_->wait is within the same block without giving up the lock/notifying the main thread if condition variable wake up condition is true. The last thread in the poll passes the condition defined in wakeWorker() straightaway, hence your observation.
(I hope this code is just some toy code to play with -- it has too many issues ... it doesn't surprise me if it gets deadlocked given the manual mutex lock/unlock ...)
this->num_wait++;
std::cout << "WORKER WAIT" << std::endl;
this->cond_->wait(lock,
std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
std::cout << "WORKER AWAKENS!" << std::endl;
According to https://en.cppreference.com/w/cpp/thread/condition_variable/wait, condition_variable::wait() is equivalent to
while (!pred()) {
wait(lock);
}
so if pred() return true, the execution doesn't give up lock
Upvotes: 0