Reputation: 2087
Consider the next C++ code fragment which represnets (pseudo) multi thread application which has two threads, the main thread responsible for getting new data (e.g., new frame from video stream) and the secondary thread which needs to process the data (e.g., filter the frame), this should happen in parallel so, for example, when the main thread acquires the first frame the second thread does not do anything, when the main thread acquires the second frame the second thread process the first frame and so on.
#include <iostream>
#include <mutex>
#include <condition_variable>
using namespace std;
bool StartProcessFromMain = false;
bool FinishedProcess = false;
mutex Main2Process;
mutex Process2Main;
condition_variable CVMain2Process;
condition_variable CVProcess2Main;
void Process();
int main()
{
long long int Iter = 0;
// start data process thread
thread ProcessThread(Process);
StartProcessFromMain = true;
CVMain2Process.notify_one();
// this is the main data acquiring thread
unique_lock<mutex> lk2(Process2Main);
lk2.unlock();
while (true)
{
cout << "Iteration: " << ++Iter << endl;
cout << "Waiting for data processing to finish" << endl;
lk2.lock();
CVProcess2Main.wait(lk2, [&] {return FinishedProcess; });
FinishedProcess = false;
lk2.unlock();
cout << "Data processing ended..... getting new data" << endl;
// getting new data
cout << "Got new data, calling process thread" << endl;
StartProcessFromMain = true;
CVMain2Process.notify_one();
}
return 0;
}
void Process()
{
//data processing thread
unique_lock<mutex> lk(Main2Process);
lk.unlock();
while (true)
{
cout << "waiting for new data" << endl;
lk.lock();
CVMain2Process.wait(lk, [&] {return StartProcessFromMain; });
StartProcessFromMain = false;
lk.unlock();
// Processing new data here
// Finished processing new data - notify data acquiring thread
cout << "Finished processing new data" << endl;
FinishedProcess = true;
CVProcess2Main.notify_one();
}
}
In practice, compiled both with MSVC 2015 (x64) on Windows 10 and with gcc on Ubuntu 16.04, this code always hangs forever (usually after thousands or tens of thousands iterations) when both threads wait one for the other :-(
What am I doing wrong here?
Upvotes: 0
Views: 767
Reputation: 10445
I wasn't able to compile your program with any of the c++ versions I could find, so I translated (upgraded?) it to C, then made it work with pthreads. It is at the bottom.
The most important bit; it uses one mutex and one condvar to protects two flags - ProcessingData, DataAvailable. These are the fundamental communication between the threads. The first thread, main, first waits until ProcessingData is false. This keeps it from interfering with the Process() thread while it is working on the shared data. Once ProcessingData is false, main is free to go and get fresh data. It then reacquires the single mutex (MainLock), sets DataAvailable to true, and signals the condvar MainCV.
The second thread, Process, first waits until DataAvailable is true. This keeps it from interfering with the main() thread while it is off getting new data. Once DataAvailable is true, it sets ProcessingData to true, goes and processes the data, and once it is done, reacquires the single mutex MainLock, clears ProcessingData and signals the condvar MainCV.
This is free of races; and the threads can continue independently after handshaking the data exchange. This can be done with two condvars and one mutex, but there isn't really any advantage to it.
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
bool ProcessingData = false;
bool DataAvailable = false;
pthread_mutex_t MainLock;
pthread_cond_t MainCV;
pthread_cond_t MainCV;
void *Process(void *);
#define X(y) do { if (y == -1) abort(); } while (0)
int
main()
{
X(pthread_mutex_init(&MainLock, NULL));
X(pthread_cond_init(&MainCV, NULL));
long Iter = 0;
//start data process thread
pthread_t id;
X(pthread_create(&id, NULL, Process, NULL));
X(pthread_cond_signal(&MainCV));
while (true) {
printf("Iteration: %ld waiting for processing to finish\n", ++Iter);
X(pthread_mutex_lock(&MainLock));
while (ProcessingData) {
X(pthread_cond_wait(&MainCV, &MainLock));
}
X(pthread_mutex_unlock(&MainLock));
printf("Data processing ended, getting new data\n");
//getting new data
printf("Got new data, calling process thread\n");
X(pthread_mutex_lock(&MainLock));
DataAvailable = true;
X(pthread_mutex_unlock(&MainLock));
X(pthread_cond_signal(&MainCV));
}
return 0;
}
void *Process(void *notused)
{
//data processing thread
while (true) {
printf("waiting for new data\n");
X(pthread_mutex_lock(&MainLock));
while (!DataAvailable) {
X(pthread_cond_wait(&MainCV, &MainLock));
}
ProcessingData = true;
X(pthread_mutex_unlock(&MainLock));
//Processing new data here
// Finished processing new data - notify data acquiring thread
printf("Finished processing new data\n");
X(pthread_mutex_lock(&MainLock));
ProcessingData = false;
X(pthread_mutex_unlock(&MainLock));
X(pthread_cond_signal(&MainCV));
}
}
Upvotes: 1