JuggernautDad
JuggernautDad

Reputation: 1145

C++ UNIX threading

I am doing a project with threading in UNIX and C++. Basically there is a producer thread, and 5 consumer threads. The producer thread adds incrementing numbers into a queue at random times, and the consumer threads poll the q trying to remove it. For some reason my q.size() keeps going negative and i cant figure out why.

 #include <queue>
 #include <list>

 #include <stdio.h>
 #include <unistd.h>
 #include <stdlib.h>
 #include <string.h>
 #include <pthread.h>

 using namespace std;

 #define NUM_CONSUMER_THREADS 5
 #define NUM_PRODUCER_THREADS 1
 #define BUFFER_SIZE 20

 void *c_thread_function(void *arg);
 void *p_thread_function(void *arg);

 queue<int> q;

 int produce(int cur)
 {
  int temp = cur + 1;
  return temp;
 }

 void append(int num)
 {
  if ( q.size() < BUFFER_SIZE )
  {
   q.push(num);
  }
 }

 int take()
 {
  int removed = q.front();
  q.pop();
  sleep(1);
  return removed;
 }

 void consume(int num, int thread)
 {
  printf("%d consumed %d \n", thread, num);
 }


 int main() 
 {
  int result;

  pthread_t cthreads[NUM_CONSUMER_THREADS];
  pthread_t pthreads[NUM_PRODUCER_THREADS];

  void *thread_result;

  // build an array of consumer threads
  for(int num_of_cthreads = 0; num_of_cthreads < NUM_CONSUMER_THREADS; num_of_cthreads++) 
  {
   result = pthread_create(&(cthreads[num_of_cthreads]), NULL, c_thread_function, (void *)num_of_cthreads);
   if ( result != 0 )
   {
    perror( "Thread Creation Failed");
    exit(EXIT_FAILURE);
   }
   //sleep(1);  
  } 

  // build an array of producer threads
  for(int num_of_pthreads = 0; num_of_pthreads < NUM_PRODUCER_THREADS; num_of_pthreads++) 
  {
   result = pthread_create(&(pthreads[num_of_pthreads]), NULL, p_thread_function, NULL);
   if ( result != 0 )
   {
    perror( "Thread Creation Failed");
    exit(EXIT_FAILURE);
   }
   //sleep(1);  
  }

  printf("All threads created\n");
  while ( true )
  {
   // do nothing
  }
 }

 void *c_thread_function(void *arg)
 {
  int temp = (long)arg;
  printf("Consumer thread %d created \n", temp);

  while ( true )
  {
   while (  q.size() > 0 )
   {
    int w = take();
    consume(w, temp);
    printf(" q size is now %d \n", q.size());
   }
  }
 }

 void *p_thread_function(void *arg) 
 {
  printf("Producer thread created \n");

  int itemsAdded = 0;
  int temp;
  int sleepTime;

  while ( true ) 
  {
   while ( q.size() < BUFFER_SIZE )
   {
    temp = produce(itemsAdded);

    sleepTime = 1+(int)(9.0*rand()/(RAND_MAX+1.0));
    sleep(sleepTime);

    append(temp);

    printf("Producer adds: %d \n", temp);
    printf(" q size is now %d \n", q.size());

    itemsAdded++;
   }
  }
 }

Output:

Producer adds: 1

q size is now -1

0 consumed 1

q size is now -2

1 consumed 1

q size is now -3

3 consumed 1

q size is now -4

4 consumed 0

q size is now -5

0 consumed 0

Upvotes: 1

Views: 3731

Answers (5)

Loki Astari
Loki Astari

Reputation: 264431

Couple of issues:

  • You are doing a busy wait.
    learn to use condition variables. Thus threads waiting do not use resources.

  • The int temp = (long)arg; is not going to work.
    There is no guarantee when the thread is going to be scheduled to run.
    This the pointer arg is pointing at a variable that could have changed a long time ago.

  • Both consumer/producer threads modify the que q without gaining exclusive access.
    Any other thread can modify the que between the test on the size and the point where you add stuff. Even worse another thread may simultaneously try and modify the que (and I am relatively certain that the STL are not thread safe for modification).

Try something like this:

#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>

#define CONSUMER_COUNT  5
#define PRODUCER_COUNT  2

Part one all the data need by the thread

struct ThreadQueue
{
    ThreadQueue()
        : finished(false)
    {
         if (pthread_mutex_init(&mutex, NULL) != 0)
         {  throw int(1);
         }
         if (pthread_cond_init(&cond, NULL) != 0)
         {
             // Technically we should wrap the mutext.
             // So if the condition variable fails it is
             // auto destroyed. This is left as an exercise.
             throw int(1);
         }
    }
    ~ThreadQueue()
    {
        if (pthread_cond_destroy(&cond) != 0)
        {   //throw int(1); // Do we really care?
        }
        if (pthread_mutex_destroy(&mutex) != 0)
        {   //throw int(1);
        }
    }
    std::queue<int>     data;
    pthread_mutex_t     mutex;
    pthread_cond_t      cond;
    bool                finished;
};

The consumer Thread

extern "C" void* consumerThread(void* arg)
{
    ThreadQueue&     que = *static_cast<ThreadQueue*>(arg);

    while(!que.finished)
    {
        // Get the lock before proceeding
        pthread_mutex_lock(&que.mutex);
        while(que.data.size() == 0)
        {
            // If there is no data in the que the sleep on the condition.
            pthread_cond_wait(&que.cond, &que.mutex);

            // We may have been released here because of a signal.
            // That does not mean we got out before one of the other
            // consumer threads already stoll the value from the queue.
            // So we must be in a loop and re-check the size() of the
            // que before we proceed. If the value was already stolen
            // then we go back to sleep waiting on the condition variable.

            if (que.finished)
                break;
        }

        // We have a lock with data in the que
        int value   = que.data.front();
        que.data.pop();

        // Use the same lock to access std::cout
        std::cout << "Consumer Got: " << value << "\n";
        pthread_mutex_unlock(&que.mutex);
    }
    return NULL;
}

The producer thread

extern "C" void* producerThread(void* arg)
{
    ThreadQueue&     que = *static_cast<ThreadQueue*>(arg);

    while(!que.finished)
    {
        // Get the lock before proceeding
        pthread_mutex_lock(&que.mutex);

        // Add a new value to the queue
        int value = rand();
        que.data.push(value);

        // Ise the same lock to access std::cout
        std::cout << "Producer Push: " << value << "\n";

        // Signal a consumer to be released.
        pthread_cond_signal(&que.cond);

        // rand maintains internal state.
        // calls to rand() should therefore be protected by a mutex.
        // Again in this simple example we re-use the same mutex for protection
        int sleepTime = rand() % 5;

        // Now release the lock
        pthread_mutex_unlock(&que.mutex);
        sleep(sleepTime);
    }
    return NULL;
}

The main loop

int main()
{
    srand(time(NULL));

    ThreadQueue     queue;
    pthread_t       consumerThreads[CONSUMER_COUNT];
    pthread_t       producerThreads[PRODUCER_COUNT];

    try
    {
      for(int loop=0 ;loop < CONSUMER_COUNT; ++loop)
      {
          if (pthread_create(&consumerThreads[loop], NULL, consumerThread, &queue) != 0)
          {   throw int(2);
          }
      }
      for(int loop=0 ;loop < PRODUCER_COUNT; ++loop)
      {
          if (pthread_create(&producerThreads[loop], NULL, producerThread, &queue) != 0)
          {   throw int(3);
          }
      }
   }
   catch(...)
   {
       // Set the finished to true so all threads exit.
       queue.finished = true;
       // Some consumers may be waiting on the condition.
       // So wake them up one signal per consumer should do it.
       for(int loop = 0;loop < CONSUMER_COUNT; ++loop)
       {    pthread_cond_signal(&queue.cond);
       }
    }

    /* Wait for all threads to finish */
    for(int loop=0; loop < CONSUMER_COUNT; ++loop)
    {
        pthread_join(consumerThreads[loop], NULL);
    }
    for(int loop=0; loop < PRODUCER_COUNT; ++loop)
    {
        pthread_join(producerThreads[loop], NULL);
    }
};

Hop I got that correct :-)

Upvotes: 1

JuggernautDad
JuggernautDad

Reputation: 1145

i found the issue...

the point of the assignment was to show how un-reliable threading without semaphores was. here is the code i needed to fix...

int take()
{
 int removed = q.front();
 sleep(1); // switched
 q.pop();  // these two...
 return removed;
}

i also removed the sleep timer from the producer thread. now everything works...

the output now does this:

---Producer adds: 1 ---
---Producer adds: 2 ---
---Producer adds: 3 ---
---Producer adds: 4 ---
---Producer adds: 5 ---
---Producer adds: 6 ---
---Producer adds: 7 ---
---Producer adds: 8 ---
---Producer adds: 9 ---
---Producer adds: 10 ---
---Producer adds: 11 ---
---Producer adds: 12 ---
---Producer adds: 13 ---
---Producer adds: 14 ---
---Producer adds: 15 ---
---Producer adds: 16 ---
---Producer adds: 17 ---
---Producer adds: 18 ---
---Producer adds: 19 ---
---Producer adds: 20 ---
Thread 3 consumed 1 
Thread 1 consumed 1 
Thread 2 consumed 1 
Thread 4 consumed 1 
Thread 0 consumed 1 
---Producer adds: 21 ---
---Producer adds: 22 ---
---Producer adds: 23 ---
---Producer adds: 24 ---
---Producer adds: 25 ---
Thread 3 consumed 6 
Thread 4 consumed 6 
Thread 1 consumed 6 
---Producer adds: 26 ---
---Producer adds: 27 ---
---Producer adds: 28 ---
---Producer adds: 29 ---
---Producer adds: 30 ---
Thread 0 consumed 6 
Thread 2 consumed 6 
Thread 3 consumed 11 
Thread 4 consumed 11    

Upvotes: 0

Charles Salvia
Charles Salvia

Reputation: 53289

You need to learn about the concept of race conditions and mutual exclusion. Your std::queue object is a shared resource, meaning that more than one thread is operating on it - potentially at the same time. That means you have to protect it using locks (known as mutexes), so that each access is synchronized. Otherwise, you'll get what's known as a race condition, where one thread modifies data while another thread is also accessing/modifying the data, leading to an inconsistent or corrupt program state.

To prevent race conditions, you need to lock a pthread_mutex object before every queue access.

First, you need to create a mutex object and initialize it.

pthread_mutex mymutex;
pthread_mutex_init(&mymutex, 0);

Your application code should then look something like this:

pthread_mutex_lock(&mymutex);

// Do something with queue

pthread_mutex_unlock(&mymutex);

When one thread acquires the lock, no other thread can acquire the lock. A thread that tries to acquire a lock that has already been acquired by another thread will simply wait until the lock is released. This synchronizes access to the queue, ensuring that only one thread modifies it at a time.

Upvotes: 5

casablanca
casablanca

Reputation: 70701

STL containers such as queue are not thread-safe. You need to synchronize access to the queue object, for example, using a mutex.

Upvotes: 4

Related Questions