Reputation: 1145
I am doing a project with threading in UNIX and C++. Basically there is a producer thread, and 5 consumer threads. The producer thread adds incrementing numbers into a queue at random times, and the consumer threads poll the q trying to remove it. For some reason my q.size() keeps going negative and i cant figure out why.
#include <queue>
#include <list>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
using namespace std;
#define NUM_CONSUMER_THREADS 5
#define NUM_PRODUCER_THREADS 1
#define BUFFER_SIZE 20
void *c_thread_function(void *arg);
void *p_thread_function(void *arg);
queue<int> q;
int produce(int cur)
{
int temp = cur + 1;
return temp;
}
void append(int num)
{
if ( q.size() < BUFFER_SIZE )
{
q.push(num);
}
}
int take()
{
int removed = q.front();
q.pop();
sleep(1);
return removed;
}
void consume(int num, int thread)
{
printf("%d consumed %d \n", thread, num);
}
int main()
{
int result;
pthread_t cthreads[NUM_CONSUMER_THREADS];
pthread_t pthreads[NUM_PRODUCER_THREADS];
void *thread_result;
// build an array of consumer threads
for(int num_of_cthreads = 0; num_of_cthreads < NUM_CONSUMER_THREADS; num_of_cthreads++)
{
result = pthread_create(&(cthreads[num_of_cthreads]), NULL, c_thread_function, (void *)num_of_cthreads);
if ( result != 0 )
{
perror( "Thread Creation Failed");
exit(EXIT_FAILURE);
}
//sleep(1);
}
// build an array of producer threads
for(int num_of_pthreads = 0; num_of_pthreads < NUM_PRODUCER_THREADS; num_of_pthreads++)
{
result = pthread_create(&(pthreads[num_of_pthreads]), NULL, p_thread_function, NULL);
if ( result != 0 )
{
perror( "Thread Creation Failed");
exit(EXIT_FAILURE);
}
//sleep(1);
}
printf("All threads created\n");
while ( true )
{
// do nothing
}
}
void *c_thread_function(void *arg)
{
int temp = (long)arg;
printf("Consumer thread %d created \n", temp);
while ( true )
{
while ( q.size() > 0 )
{
int w = take();
consume(w, temp);
printf(" q size is now %d \n", q.size());
}
}
}
void *p_thread_function(void *arg)
{
printf("Producer thread created \n");
int itemsAdded = 0;
int temp;
int sleepTime;
while ( true )
{
while ( q.size() < BUFFER_SIZE )
{
temp = produce(itemsAdded);
sleepTime = 1+(int)(9.0*rand()/(RAND_MAX+1.0));
sleep(sleepTime);
append(temp);
printf("Producer adds: %d \n", temp);
printf(" q size is now %d \n", q.size());
itemsAdded++;
}
}
}
Output:
Producer adds: 1
q size is now -1
0 consumed 1
q size is now -2
1 consumed 1
q size is now -3
3 consumed 1
q size is now -4
4 consumed 0
q size is now -5
0 consumed 0
Upvotes: 1
Views: 3731
Reputation: 264431
Couple of issues:
You are doing a busy wait.
learn to use condition variables. Thus threads waiting do not use resources.
The int temp = (long)arg;
is not going to work.
There is no guarantee when the thread is going to be scheduled to run.
This the pointer arg is pointing at a variable that could have changed a long time ago.
Both consumer/producer threads modify the que q
without gaining exclusive access.
Any other thread can modify the que between the test on the size and the point where you add stuff. Even worse another thread may simultaneously try and modify the que (and I am relatively certain that the STL are not thread safe for modification).
Try something like this:
#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#define CONSUMER_COUNT 5
#define PRODUCER_COUNT 2
struct ThreadQueue
{
ThreadQueue()
: finished(false)
{
if (pthread_mutex_init(&mutex, NULL) != 0)
{ throw int(1);
}
if (pthread_cond_init(&cond, NULL) != 0)
{
// Technically we should wrap the mutext.
// So if the condition variable fails it is
// auto destroyed. This is left as an exercise.
throw int(1);
}
}
~ThreadQueue()
{
if (pthread_cond_destroy(&cond) != 0)
{ //throw int(1); // Do we really care?
}
if (pthread_mutex_destroy(&mutex) != 0)
{ //throw int(1);
}
}
std::queue<int> data;
pthread_mutex_t mutex;
pthread_cond_t cond;
bool finished;
};
extern "C" void* consumerThread(void* arg)
{
ThreadQueue& que = *static_cast<ThreadQueue*>(arg);
while(!que.finished)
{
// Get the lock before proceeding
pthread_mutex_lock(&que.mutex);
while(que.data.size() == 0)
{
// If there is no data in the que the sleep on the condition.
pthread_cond_wait(&que.cond, &que.mutex);
// We may have been released here because of a signal.
// That does not mean we got out before one of the other
// consumer threads already stoll the value from the queue.
// So we must be in a loop and re-check the size() of the
// que before we proceed. If the value was already stolen
// then we go back to sleep waiting on the condition variable.
if (que.finished)
break;
}
// We have a lock with data in the que
int value = que.data.front();
que.data.pop();
// Use the same lock to access std::cout
std::cout << "Consumer Got: " << value << "\n";
pthread_mutex_unlock(&que.mutex);
}
return NULL;
}
extern "C" void* producerThread(void* arg)
{
ThreadQueue& que = *static_cast<ThreadQueue*>(arg);
while(!que.finished)
{
// Get the lock before proceeding
pthread_mutex_lock(&que.mutex);
// Add a new value to the queue
int value = rand();
que.data.push(value);
// Ise the same lock to access std::cout
std::cout << "Producer Push: " << value << "\n";
// Signal a consumer to be released.
pthread_cond_signal(&que.cond);
// rand maintains internal state.
// calls to rand() should therefore be protected by a mutex.
// Again in this simple example we re-use the same mutex for protection
int sleepTime = rand() % 5;
// Now release the lock
pthread_mutex_unlock(&que.mutex);
sleep(sleepTime);
}
return NULL;
}
int main()
{
srand(time(NULL));
ThreadQueue queue;
pthread_t consumerThreads[CONSUMER_COUNT];
pthread_t producerThreads[PRODUCER_COUNT];
try
{
for(int loop=0 ;loop < CONSUMER_COUNT; ++loop)
{
if (pthread_create(&consumerThreads[loop], NULL, consumerThread, &queue) != 0)
{ throw int(2);
}
}
for(int loop=0 ;loop < PRODUCER_COUNT; ++loop)
{
if (pthread_create(&producerThreads[loop], NULL, producerThread, &queue) != 0)
{ throw int(3);
}
}
}
catch(...)
{
// Set the finished to true so all threads exit.
queue.finished = true;
// Some consumers may be waiting on the condition.
// So wake them up one signal per consumer should do it.
for(int loop = 0;loop < CONSUMER_COUNT; ++loop)
{ pthread_cond_signal(&queue.cond);
}
}
/* Wait for all threads to finish */
for(int loop=0; loop < CONSUMER_COUNT; ++loop)
{
pthread_join(consumerThreads[loop], NULL);
}
for(int loop=0; loop < PRODUCER_COUNT; ++loop)
{
pthread_join(producerThreads[loop], NULL);
}
};
Hop I got that correct :-)
Upvotes: 1
Reputation: 1145
i found the issue...
the point of the assignment was to show how un-reliable threading without semaphores was. here is the code i needed to fix...
int take()
{
int removed = q.front();
sleep(1); // switched
q.pop(); // these two...
return removed;
}
i also removed the sleep timer from the producer thread. now everything works...
the output now does this:
---Producer adds: 1 ---
---Producer adds: 2 ---
---Producer adds: 3 ---
---Producer adds: 4 ---
---Producer adds: 5 ---
---Producer adds: 6 ---
---Producer adds: 7 ---
---Producer adds: 8 ---
---Producer adds: 9 ---
---Producer adds: 10 ---
---Producer adds: 11 ---
---Producer adds: 12 ---
---Producer adds: 13 ---
---Producer adds: 14 ---
---Producer adds: 15 ---
---Producer adds: 16 ---
---Producer adds: 17 ---
---Producer adds: 18 ---
---Producer adds: 19 ---
---Producer adds: 20 ---
Thread 3 consumed 1
Thread 1 consumed 1
Thread 2 consumed 1
Thread 4 consumed 1
Thread 0 consumed 1
---Producer adds: 21 ---
---Producer adds: 22 ---
---Producer adds: 23 ---
---Producer adds: 24 ---
---Producer adds: 25 ---
Thread 3 consumed 6
Thread 4 consumed 6
Thread 1 consumed 6
---Producer adds: 26 ---
---Producer adds: 27 ---
---Producer adds: 28 ---
---Producer adds: 29 ---
---Producer adds: 30 ---
Thread 0 consumed 6
Thread 2 consumed 6
Thread 3 consumed 11
Thread 4 consumed 11
Upvotes: 0
Reputation: 53289
You need to learn about the concept of race conditions and mutual exclusion. Your std::queue
object is a shared resource, meaning that more than one thread is operating on it - potentially at the same time. That means you have to protect it using locks (known as mutexes), so that each access is synchronized. Otherwise, you'll get what's known as a race condition, where one thread modifies data while another thread is also accessing/modifying the data, leading to an inconsistent or corrupt program state.
To prevent race conditions, you need to lock a pthread_mutex
object before every queue access.
First, you need to create a mutex object and initialize it.
pthread_mutex mymutex;
pthread_mutex_init(&mymutex, 0);
Your application code should then look something like this:
pthread_mutex_lock(&mymutex);
// Do something with queue
pthread_mutex_unlock(&mymutex);
When one thread acquires the lock, no other thread can acquire the lock. A thread that tries to acquire a lock that has already been acquired by another thread will simply wait until the lock is released. This synchronizes access to the queue, ensuring that only one thread modifies it at a time.
Upvotes: 5
Reputation: 70701
STL containers such as queue
are not thread-safe. You need to synchronize access to the queue object, for example, using a mutex.
Upvotes: 4