user9455344
user9455344

Reputation:

How to make a queue thread-safe

Two threads working on this queue at the same time could make it fail. What will you do to make concurrent accesses work correctly? Should the queue make the threads wait if they try to remove an element from an empty queue or add one to a full one?

#include <stdlib.h>

// circular array
typedef struct _queue {
    int size;
    int used;
    int first;
    void **data;
} _queue;

#include "queue.h"

queue q_create(int size) {
    queue q = malloc(sizeof(_queue));

    q->size  = size;
    q->used  = 0;
    q->first = 0;
    q->data = malloc(size*sizeof(void *));

    return q;
}

int q_elements(queue q) {
    return q->used;
}

int q_insert(queue q, void *elem) {
    if(q->size==q->used) return 0;

    q->data[(q->first+q->used) % q->size] = elem;    
    q->used++;

    return 1;
}

void *q_remove(queue q) {
    void *res;
    if(q->used==0) return NULL;

    res=q->data[q->first];

    q->first=(q->first+1) % q->size;
    q->used--;

    return res;
}

void q_destroy(queue q) {
    free(q->data);
    free(q);
}

The objective of this queue is to manage the input and output of the compression tool I'm trying to develop.

Upvotes: 0

Views: 2083

Answers (1)

Tsakiroglou Fotis
Tsakiroglou Fotis

Reputation: 1031

Thread safe means that you have to isolate any shared data.Here your shared data is the pointer to the queue.So , in general , any time you have operations on the queue you need to protect queue and prevent multiple threads reach your queue at the same time.One good way is to implement Condition Variables. Variables are switches to lock threads in crucial data.

I can't see the thread code here. So , let's assume you have a thread that writes in a queue. First you have to declare Condition Variables.

#include <pthread.h>


pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t w_condition = PTHREAD_COND_INITIALIZER;

que_state_t que_write_state = READY;  //this is just an enum


void* writer_thread_v2(void* t)
{
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
    int i = 0;
    while ( 1 )
    {
        pthread_mutex_lock(&mutex2);    
        while( que_write_state == NOT_READY )
        {
            pthread_cond_wait(&w_condition , &mutex2);
        }
        que_write_state = NOT_READY;
        person_t* doe = NULL;
        pthread_cleanup_push(clean_memory2, &doe);
        doe = (person_t*) malloc(sizeof(person_t));

        person_t static_person = random_person();
        doe->age_bracket = static_person.age_bracket;
        memcpy(doe->name , static_person.name , NAME_LENGTH);
        push(the_queue , doe );
        pthread_cleanup_pop(0);

        que_write_state = READY;
        pthread_mutex_unlock(&mutex2);
        pthread_cond_signal(&w_condition);
        i++;
    }
    pthread_exit(0);
}

Here the thread enters a while loop.

    pthread_mutex_lock(&mutex2);    //Here puts a lock on mutex    
    while( que_write_state == NOT_READY )    //Here checks the enum
    {
        pthread_cond_wait(&w_condition , &mutex2);
    }
    que_write_state = NOT_READY;      //Here puts the enum in not ready mode

Now all threads of this kind are going to stop and wait until the queue operations finish and you release the mutex. A big memory leak problem comes here.What if a thread creates an object and it is cancelled before add data in queue.This is why in the code there is

pthread_cleanup_push(clean_memory2, &doe);
{
    inside here it mallocs the heap memory it is needed and adds data in 
    queue.If at any time the thread is cancelled clean_memory2() is called 
    which frees doe.
}
pthread_cleanup_pop(0);

pthread_cleanup_push is going to protect you for memory leaks while creating the object you want to store , and complete the import procedure.Cleanup_push (ghost block) stops at cleanup switch. After push in que this thread is going to release mutex and sends a signal to the rest of the Threads waiting.

    que_write_state = READY;   //here sets switch to ready mode
    pthread_mutex_unlock(&mutex2);   //unlocks mutex

    pthread_cond_signal(&w_condition); //signal the next one.

Thank you.

Upvotes: 1

Related Questions