Reputation: 91
This is a simple program which has a function start() which waits for user to enter something(using infinite loop) and stores it in queue. start() runs in a separate thread. After user enters some value, the size of queue remains zero in main. How can the queue be synchronized?
code: source.cpp
#include <iostream>
#include "kl.h"
using namespace std;
int main()
{
std::thread t1(start);
while (1)
{
if (q.size() > 0)
{
std::cout << "never gets inside this if\n";
std::string first = q.front();
q.pop();
}
}
t1.join();
}
code: kl.h
#include <queue>
#include <iostream>
#include <string>
void start();
static std::queue<std::string> q;
code: kl.cpp
#include "kl.h"
using namespace std;
void start()
{
char i;
string str;
while (1)
{
for (i = 0; i <= 1000; i++)
{
//other stuff and str input
q.push(str);
}
}
}
Upvotes: 4
Views: 4444
Reputation: 29
/* Here I have a code snippate with Separate class for Producing and Consuming along with buffer class */ #include <iostream> #include <mutex> #include <condition_variable> #include <thread> #include <deque> #include <vector> using namespace std; mutex _mutex_1,_mutex_2; condition_variable cv; template <typename T> class Queue { deque<T> _buffer; const unsigned int max_size = 10; public: Queue() = default; void push(const T& item) { while(1) { unique_lock<mutex> locker(_mutex_1); cv.wait(locker,[this](){ return _buffer.size() < max_size; }); _buffer.push_back(item); locker.unlock(); cv.notify_all(); return; } } T pop() { while(1) { unique_lock<mutex> locker(_mutex_1); cv.wait(locker,[this](){ return _buffer.size() > 0; }); int back = _buffer.back(); _buffer.pop_back(); locker.unlock(); cv.notify_all(); return back; } } }; class Producer { Queue<int>* _buffer; public: Producer(Queue<int>* _buf) { this->_buffer = _buf; } void run() { while(1) { auto num = rand()%100; _buffer->push(num); _mutex_2.lock(); cout<<"Produced:"<<num<<endl; this_thread::sleep_for(std::chrono::milliseconds(50)); _mutex_2.unlock(); } } }; class Consumer { Queue<int>* _buffer; public: Consumer(Queue<int>* _buf) { this->_buffer = _buf; } void run() { while(1) { auto num = _buffer->pop(); _mutex_2.lock(); cout<<"Consumed:"<<num<<endl; this_thread::sleep_for(chrono::milliseconds(50)); _mutex_2.unlock(); } } }; void client() { Queue<int> b; Producer p(&b); Consumer c(&b); thread producer_thread(&Producer::run, &p); thread consumer_thread(&Consumer::run, &c); producer_thread.join(); consumer_thread.join(); } int main() { client(); return 0; }
Upvotes: 0
Reputation: 1403
My synced queue class example and its usage:
template<typename T>
class SyncQueue
{
std::queue<T> m_Que;
std::mutex m_Lock;
std::condition_variable m_ConVar;
public:
void enque(T item)
{
std::unique_lock<std::mutex> lock(m_Lock);
m_Que.push(item);
lock.unlock();
m_ConVar.notify_all();
}
T deque()
{
std::unique_lock<std::mutex> lock(m_Lock);
do
{
m_ConVar.wait(lock);
} while(m_Que.size() == 0); // extra check from spontaneous notifications
auto ret = m_Que.front();
m_Que.pop();
return ret;
}
};
int main()
{
using namespace std::chrono_literals;
SyncQueue<int> sq;
std::thread consumer([&sq]()
{
std::cout << "consumer" << std::endl;
for(;;)
{
std::cout << sq.deque() << std::endl;
}
});
std::thread provider([&sq]()
{
std::this_thread::sleep_for(1s);
sq.enque(1);
std::this_thread::sleep_for(3s);
sq.enque(2);
std::this_thread::sleep_for(5s);
sq.enque(3);
});
consumer.join();
return 0;
}
Upvotes: 0
Reputation: 76297
Your code contains a race - by me it crashed; both threads are potentially modifying a shared queue. (Also, you're looping with char i
for values up to 1000 - not a good idea, probably.)
You should protect your shared queue with a std::mutex
, and use a std::condition_variable
to notify that there is a reason to check the queue.
Specifically, you should consider the following (which is very common for your case of a producer consumer):
Access the queue only when holding the mutex.
Use the condition variable to notify that you've pushed something into it.
Use the condition variable to specify a condition on when there's a point to continue processing.
Here is a rewrite of your code:
#include <iostream>
#include <queue>
#include <thread>
#include <condition_variable>
#include <mutex>
using namespace std;
std::queue<std::string> q;
std::mutex m;
std::condition_variable cv;
void start()
{
string str;
for (std::size_t i = 0; i <= 1000; i++) {
//other stuff and str input
std::cout << "here" << std::endl;
std::unique_lock<std::mutex> lk(m);
q.push(str);
lk.unlock();
cv.notify_one();
}
}
int main()
{
std::thread t1(start);
for (std::size_t i = 0; i <= 1000; i++)
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return !q.empty();});
std::string first = q.front();
q.pop();
}
t1.join();
}
Upvotes: 2