nmogk
nmogk

Reputation: 73

Thread blocking on sem_wait causes other threads to hang

I wrote a generic queue in C which is to be used for a variety of payload types. It is a blocking queue so that consumer threads will block waiting for the queue to be populated by producer threads.

I have tested the queue code in isolation using check, including the behavior where a thread blocks waiting for a value to be added to the queue. All of these tests passed, however, when integrating the queue into the rest of the code I encounter a situation where the first time a thread attempts to block on the queue, all other threads hang.

To be specific, the program I am integrating with is a member of a larger ecosystem, so there is a startup script which initializes the program, which then daemonizes. The daemonized thread then creates several detached threads to perform various functions. One of these threads makes a call to sem_wait and all threads hang, including the thread which spawned the daemon.

To confirm that this call was the problem, I ran the program in non-daemon mode with the debugger, which confirmed that the sem_wait was hanging. I also added a sleep before spawning the thread which waits on the queue. In this case, the other threads progressed farther along and then hung when the sem_wait call was made.

The queue in question is only visible to this one program. Its reference is stored as a global variable. The queue is certainly empty when the call to sem_wait is performed.

The following is the queue code:

//Queue.h
#include <pthread.h>
#include <semaphore.h>

typedef void (*freeFunction)(void *);

typedef struct _queueNode {
  void *data;
  struct _queueNode *next;
} queueNode;


typedef struct queue {
  sem_t *logicalLength;
  size_t elementSize;

  queueNode *head;
  queueNode *tail;

  freeFunction freeFn;
  pthread_mutex_t *queueLock;
} queue_t;

void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn);
void queue_destroy(queue_t *queue); // Removes all elements from the queue

int queue_size(queue_t *queue); // Returns the number of elements in the queue

void queue_add(queue_t *queue, void *element);          // Adds to tail
int queue_take(queue_t *queue, void *elementBuffer); // Returns/removes head, blocks if empty


//Queue.c
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <time.h>

#include "Queue.h"

void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn) {

  assert(elementSize > 0);
  assert(queue != NULL);

  queue->elementSize = elementSize;

  queue->head = NULL;
  queue->tail = NULL;

  queue->freeFn = freeFn;

  queue->logicalLength = calloc(1, sizeof(sem_t));
  queue->queueLock = calloc(1, sizeof(pthread_mutex_t));

  sem_init(queue->logicalLength, 0, 0);

  pthread_mutexattr_t attr;
  pthread_mutexattr_init(&attr);
  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
  pthread_mutex_init(queue->queueLock, &attr);

}

void queue_destroy(queue_t *queue) {

    assert(queue != NULL);

    queueNode *current;

    while(queue->head != NULL) {

        current = queue->head;
        queue->head = current->next;

        if(queue->freeFn != NULL) {

            queue->freeFn(current->data);

        }

        free(current->data);
        free(current);

    }

    queue->head = NULL;
    queue->tail = NULL;

    pthread_mutex_destroy(queue->queueLock);
    sem_destroy(queue->logicalLength);

    free(queue->queueLock);
    free(queue->logicalLength);

}

void queue_add(queue_t *queue, void *element) {

    assert(queue != NULL);
    assert(element != NULL);

    pthread_mutex_lock(queue->queueLock);

        queueNode *node = calloc(1, sizeof(queueNode));
        node->data = calloc(1, queue->elementSize);

        node->next = NULL;

        memcpy(node->data, element, queue->elementSize);

        if(queue->head == NULL) {

            queue->head = queue->tail = node;

        } else {

            queue->tail->next = node;
            queue->tail = node;

        }

        sem_post(queue->logicalLength);

    pthread_mutex_unlock(queue->queueLock);

}

void queue_removeNode(queue_t *queue, void *elementBuffer) {

    pthread_mutex_lock(queue->queueLock);

        if( queue->head == NULL ) {

            pthread_mutex_unlock(queue->queueLock);
            return;
        }

        queueNode *node = queue->head;
        memcpy(elementBuffer, node->data, queue->elementSize);

        if(queue->head == queue->tail)
            queue->tail = NULL;

        queue->head = node->next;

        if(queue->freeFn) {

            queue->freeFn(node->data);
        }

        free(node->data);
        free(node);

    pthread_mutex_unlock(queue->queueLock);

}

int queue_take(queue_t *queue, void *elementBuffer) {

    assert(queue != NULL);
    assert(elementBuffer != NULL);

    int result = EXIT_SUCCESS;

    sem_wait(queue->logicalLength);

    queue_removeNode(queue, elementBuffer);

    return result;
}

The following is the code which revealed the problem:

//fei.h
...
#include "Queue.h"
extern queue_t *commandQueue;
...

//fei.c
#include "fei.h"
#include "commandHandler.h"
#include "Queue.h"

queue_t *commandQueue;

int main (int argc, char **argv){

    int debugFlag = handleOpts(argc, argv);

    if(!debugFlag){
        int rc = daemonize();
        if(rc != 0){
            exit(rc);
        }
    }

    rc = setConfigValues();
    if(rc){
        exit(rc);
    } 

    queue_t *commandQueue = calloc(1, sizeof(queue_t));
    queue_initialize(commandQueue, sizeof(commandPack_t), commandFree);

    if(getPortIsock() == 0){ // This is a simple config value
        exit(EXIT_FAILURE);
    }

    signal(SIGPIPE, SIG_IGN);

    pthread_t id;
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    pthread_create(&id, &attr, receiveCommands, NULL);
    pthread_create(&id, &attr, processCommands, NULL);

    if(!setSocketIsock()){
        exit(1);
    }
    while(!checkIfConnectedToSct())
        usleep(50000);

    receiveCCSDSPackets();
    exit (0);
}

// commandHandler.c
#include "Queue.h"
#include "fei.h"
#include "commandHandler.h"

queue_t *commandQueue;

void *receiveCommands(){

    getNewCsockConnection();
    connectedToSct = 1;

    while(1){
        commandPack_t cmd;
        int validCommand = getCommand(CSOCKET, &cmd);
        if(validCommand == RECEIVE_SUCCESS){

        queue_add(commandQueue, &cmd);

        } else{
            usleep(5000);
        }
    }
    return NULL;
}

void *processCommands(){
    while(1){
        commandPack_t cmdToProcess;

        /* Blocking queue */
        queue_take(commandQueue, &cmdToProcess);


        switch(cmdToProcess.command){
            // Command processing
        }

        commandFree(&cmdToProcess);
    }
    return NULL;
}

The receiveCommands function is the producer thread and the processCommands function is the consumer thread. These are the only places in the code base which refer to the commandQueue. Although it is variable, execution of the main thread rarely gets beyond the setSocketIsock() condition check.

Any insight is appreciated.

Upvotes: 3

Views: 1182

Answers (1)

JS1
JS1

Reputation: 4767

In main(), you have this line:

queue_t *commandQueue = calloc(1, sizeof(queue_t));

This makes commandQueue a local variable of main. Your other functions use a global variable also named commandQueue. This makes me think that you did not intend for commandQueue to be redeclared in main. So change the above line to this:

commandQueue = calloc(1, sizeof(queue_t));

Upvotes: 1

Related Questions