Reputation: 2418
I am trying to implement a queue based worker using pthread. But I have some confusions about pthread_cond_wait()
.
class Worker {
private:
pthread_t thread;
vector<int> queue;
bool stop;
pthread_mutex_t mutex;
pthread_cond_t cond;
public:
Worker() {
stop = false;
if (pthread_mutex_init(&mutex, NULL) != 0)
{
printf("\n mutex init failed\n");
}
if(pthread_cond_init(&cond,NULL) != 0){
printf("\n cond init failed\n");
}
}
~Worker() {
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}
void interrupt(){
printf("Going to inturrupt\n");
pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond); //broadcast also doesn't work
pthread_mutex_unlock(&mutex);
printf("inturrupted \n");
}
void condition_lock(){
pthread_mutex_lock(&mutex);
while(queue.size() == 0){
printf("Entering conditional lock\n");
pthread_cond_wait(&cond,&mutex);
}
pthread_mutex_unlock(&mutex);
}
void *run(){
printf("run\n");
while(!stop){
printf("Going for condition lock");
printf("size: %d\n",queue.size());
condition_lock();
printf("Exit from condition lock");
while(queue.size() > 0){
printf("item: %d\n",queue[0]);
queue.pop_back();
}
}
pthread_exit(NULL);
}
void push(int value){
pthread_mutex_lock(&mutex);
queue.push_back(value);
pthread_mutex_unlock(&mutex);
}
void join(){
void *status;
pthread_join(thread,&status);
}
static void *run_helper(void* context){
return ((Worker *)context)->run();
}
void stop_thread(){
stop = true;
interrupt();
}
void start_thread(Worker worker){
stop = false;
int status = pthread_create(&thread,NULL,run_helper,&worker);
}
};
int main(){
Worker worker;
worker.start_thread(worker);
usleep(500000);
for(int i=0;i<5;i++){
worker.push(i);
worker.interrupt();
usleep(500000);
}
worker.stop_thread();
worker.join();
printf("Thread exit\n");
return 0;
}
run
Going for condition locksize: 0
Entering conditional lock
Going to inturrupt
inturrupted
Going to inturrupt
inturrupted
Going to inturrupt
inturrupted
Going to inturrupt
inturrupted
Going to inturrupt
inturrupted
Going to inturrupt
inturrupted
It's never return from pthread_cond_wait()
. I also don't understand how pthread_mutex_lock()
is working in void interrupt()
method because it should be already locked by void condition_lock()
.
I have changed two changes in code as suggested.
1. use queue.size() == 0 instead of conditional variable.
2. Use mutex lock/unlock during queue.push_back()
Upvotes: 3
Views: 735
Reputation: 264729
Bug here:
void start_thread(Worker worker){ // worker passed by value
// thus it is a copy.
stop = false;
int status = pthread_create(&thread,NULL,
run_helper,&worker); // Address of worker passed to thread.
} // worker destroyed here.
You are passing the worker in by value (thus getting a copy). The thread is running against this copy. But the copy was destroyed on exit of this function (thus mutex and cond are not valid).
Since this
and worker
should be the same thing.
Fix with:
void start_thread(){
stop = false;
int status = pthread_create(&thread, NULL, run_helper, this);
}
This is wrong:
void condition_lock(bool condition){
pthread_mutex_lock(&mutex);
if(condition){
// ^^^^ Should be while(<validate some invariant>)
printf("Entering conditional lock\n");
pthread_cond_wait(&cond,&mutex);
}
pthread_mutex_unlock(&mutex);
}
You really want to pass a function through here. So that the condition variable can validate the condition
each time it is revived.
You are mutating the state of the obect without aquiring the lock.
void push(int value){
queue.push_back(value);
}
There are two threads running through this class. Whenever the state is being modified you need to acquire a lock. This is done in several methods (even modiying stop
should be done under a lock).
Technically this is not the valid target of C function callback.
static void *run_helper(void* context){
return ((Worker *)context)->run();
}
C does not know C++ ABI. pthreads is a C library so the only valid pointer you can pass as a callback is a C function.
I also don't understand how pthread_mutex_lock() is working in void interrupt()
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond,&mutex); The call to wait releases the lock
on the mutex. When the thread is woken
up it must reaquire the lock before
the thread exits the call pthread_cond_wait()
This allows another thread to lock the
mutex modify state then call the signal
mrthod before releasing the lock.
this allows interupt() to run as expected.
Note: Just because you call signal does not mean the other thread is immediately scheduled for execution (it just becomes available). Your code is in such a tight loop that it may be an issue aquiring the lock it requires to exit the pthread_cond_wait()
function.
Though I left the boring fixes that still need to be done for you. You MUST check the result of all library calls to validate that they worked. If they don't work then the very least you can do is throw an exception.
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <vector>
#include <iostream>
using namespace std;
// LA: Callback must by C function.
extern "C" void *run_helper(void* context);
class Worker {
private:
pthread_t thread;
vector<int> queue;
bool stop;
pthread_mutex_t mutex;
pthread_cond_t cond;
public:
Worker() {
stop = false;
if (pthread_mutex_init(&mutex, NULL) != 0)
{
printf("\n mutex init failed\n");
}
if(pthread_cond_init(&cond,NULL) != 0){
printf("\n cond init failed\n");
}
}
~Worker() {
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}
void interrupt(){
printf("Going to inturrupt\n");
pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond); //broadcast also doesn't work
pthread_mutex_unlock(&mutex);
printf("inturrupted \n");
}
void *run(){
printf("run\n");
pthread_mutex_lock(&mutex);
while(!stop){
printf("Going for condition lock\n");
printf("size: %lu\n",queue.size());
// LA: Moved condition_lock() inline.
// This was because we needed the lock around
// accessing the state after the wait
// LA: Check queue size and if we are stopped after being woken
while(queue.size() == 0 && !stop){
printf("Entering conditional lock\n");
pthread_cond_wait(&cond,&mutex);
}
printf("Exit from condition lock\n");
while(queue.size() > 0){
printf("item: %d\n",queue[0]);
queue.pop_back();
}
}
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}
void push(int value){
// LA: All state mutation needs to be guarded.
pthread_mutex_lock(&mutex);
queue.push_back(value);
pthread_mutex_unlock(&mutex);
}
void join(){
void *status;
pthread_join(thread,&status);
}
void stop_thread(){
// LA: All state mutation needs to be guarded.
pthread_mutex_lock(&mutex);
stop = true;
pthread_mutex_unlock(&mutex);
interrupt();
}
void start_thread(){
int status = pthread_create(&thread,NULL,run_helper,this);
}
};
extern "C" void *run_helper(void* context){
return ((Worker *)context)->run();
}
int main(){
Worker worker;
worker.start_thread();
usleep(500000);
for(int i=0;i<5;i++){
worker.push(i);
worker.interrupt();
usleep(500000);
}
worker.stop_thread();
worker.join();
printf("Thread exit\n");
return 0;
}
Upvotes: 1
Reputation: 385405
I also don't understand how
pthread_mutex_lock()
is working invoid interrupt()
.
It's not. It's causing a deadlock.
it should be already locked by
void condition_lock()
.
It is. That's why:
It's never return from
pthread_cond_wait()
.
Upvotes: -2