Reputation: 1075
I my program a producer thread reads lines of text from a text file ( have about 8000 lines of text) and loads the lines into a concurrent queue.
Three consumer threads read the lines from the queue each writing to a separate file.
When I run the program only the producer thread and only one of the consumer threads complete. The other two threads seem to hang.
How do I reliably tell all consumer threads that the end of file has been reached so they should return but making sure the queue is completely empty.
My platform is Windows 7 64-bit
VC11.
Code compiled as 64-bit and 32-bit got the same behavior.
Here is the code. (It is self-contained and compilable)
#include <queue>
#include<iostream>
#include<fstream>
#include <atomic>
#include <thread>
#include <condition_variable>
#include <mutex>
#include<string>
#include<memory>
template<typename Data>
class concurrent_queue
{
private:
std::queue<Data> the_queue;
mutable std::mutex the_mutex;
std::condition_variable the_condition_variable;
public:
void push(Data const& data){
{
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push(data);
}
the_condition_variable.notify_one();
}
bool empty() const{
std::unique_lock<std::mutex> lock(the_mutex);
return the_queue.empty();
}
const size_t size() const{
std::lock_guard<std::mutex> lock(the_mutex);
return the_queue.size();
}
bool try_pop(Data& popped_value){
std::unique_lock<std::mutex> lock(the_mutex);
if(the_queue.empty()){
return false;
}
popped_value=the_queue.front();
the_queue.pop();
return true;
}
void wait_and_pop(Data& popped_value){
std::unique_lock<std::mutex> lock(the_mutex);
while(the_queue.empty()){
the_condition_variable.wait(lock);
}
popped_value=the_queue.front();
the_queue.pop();
}
};
std::atomic<bool> done(true);
typedef std::vector<std::string> segment;
concurrent_queue<segment> data;
const int one_block = 15;
void producer()
{
done.store(false);
std::ifstream inFile("c:/sample.txt");
if(!inFile.is_open()){
std::cout << "Can't read from file\n";
return;
}
std::string line;
segment seg;
int cnt = 0;
while(std::getline(inFile,line)){
seg.push_back(line);
++cnt;
if( cnt == one_block ){
data.push( seg );
seg.clear();
cnt = 0;
}
}
inFile.close();
done.store(true);
std::cout << "all done\n";
}
void consumer( std::string fname)
{
std::ofstream outFile(fname.c_str());
if(!outFile.is_open()){
std::cout << "Can't write to file\n";
return;
}
do{
while(!data.empty()){
segment seg;
data.wait_and_pop( seg );
for(size_t i = 0; i < seg.size(); ++i)
{
outFile << seg[i] << std::endl;
}
outFile.flush();
}
} while(!done.load());
outFile.close();
std::cout << fname << " done.\n";
}
int main()
{
std::thread th0(producer);
std::thread th1(consumer, "Worker1.txt");
std::thread th2(consumer, "Worker2.txt");
std::thread th3(consumer, "Worker3.txt");
th0.join();
th1.join();
th2.join();
th3.join();
return 0;
}
Upvotes: 0
Views: 1955
Reputation: 279305
You probably want to add a boolean flag to concurrent_queue
. Set it (under the mutex) once the file is read. Once the file is read and the queue is empty, broadcast the condition variable from the consumer that emptied the queue using notify_all
.
This will wake up all the other consumers, which need to spot the final condition (flag set and queue empty) and quit their loop. To avoid the race condition, that means they need to check the same combined condition before waiting in the first place.
The problem with your existing flag is that threads that never wake out of waiting for the condvar, never check it. The "finished" flag needs to be part of the state that they're waiting for.
[Edit: Dietmar's subtly different meaning for the flag probably results in simpler code, but I haven't written them both to compare.]
Upvotes: 0
Reputation: 1672
Look at the following code:
while(!data.empty()){
segment seg;
data.wait_and_pop( seg );
...
Consider a situation where the last segment of data is to be read. And comsumers th1
& th2
are waiting for data to be read.
Consumer th1
checks for !data.empty()
and finds there is data to be read. Then before th1
calls data.wait_and_pop()
, consumer th2
checks for !data.empty()
and finds it to be true. Assume comsumer th1
consumes the last segment. Now, since there is no segment to be read, th2
waits indefinitely on the_queue.empty()
in data.wait_and_pop()
.
Try this code instead of the snippet above:
segment seg;
while(data.try_pop(seg)){
...
Should get it working.
Upvotes: 0
Reputation: 153915
The approach I'm using to terminate all threads waiting on a queue is to have a flag on the queue stating whether it is done which is tested before checking that there are element in the pop()
function. If the flag indicates that the program should stop, any thread calling pop()
throws an exception if there are no elements in the queue. When the flag is changed, the changing thread just calls notify_all()
on the corresponding condition variable.
Upvotes: 1