Manatee Pink
Manatee Pink

Reputation: 253

In C, is storing data in local variables in threads akin to creating a local copy? AKA Does this Threadpool synchronization make sense?

As I explain in this question, I am having some issues with my Threadpool implementation.

Since the previous question is a bit too broad, I thought I'd also ask a very specific question.

The way I implemented the Threadpool is that worker threads go through 2 phases. First, wait for main thread to give the go signal, the sleep phase. After that, the worker threads take jobs from a queue and execute them asynchronously, the work phase. When the queue is empty, the workers go back to the sleep phase.

The way I implemented that is via 2 mutexes that only the main thread has access to. All the worker threads have access to is a pointer to one of the mutexes.

Before creating the worker threads, the main thread creates the 2 mutexes and locks the first one. Then it creates the threads and they try to lock the mutex therefore going to sleep. After creating the job queue and before unlocking mutex 1, it locks mutex 2 and sets the pointer of the worker threads to mutex 2. Then it unlocks mutex 1 and the worker threads basically lock the mutex and immediately release it again and go to work. After that they try to lock mutex 2 this time and go to sleep. Now, this repeats as often as desired and the main thread basically ping-pongs between the 2 mutexes.

One thing this scheme relies on is that worker threads store a local copy of the mutex pointer so that they can lock and unlock on the same mutex while the main thread can safely change the shared pointer.

Here is the related code:

// function for worker threads
// the 2 nested loops correspond to a waiting phase and a work phase
static void* worker_thread(void* arguments){
    WorkerThread* worker_thread = (WorkerThread*) arguments;
    SynchronizationHandle* sync_handle = worker_thread->sync_handle;

    // the outer loop deals with waiting on the main thread to start work or terminate
    while(true){
        // adress is saved to local variable so that main thread can change adress to other mutex without race condition
        pthread_mutex_t* const barrier_mutex = (pthread_mutex_t* const)sync_handle->barrier_mutex;
        // try to lock mutex and go to sleep and wait for main thread to unlock it
        pthread_mutex_lock(barrier_mutex);
        // unlock it for other threads
        pthread_mutex_unlock(barrier_mutex);

        // the inner loop executes jobs from the work queue and checks inbetween if it should terminate
        while(true){
            if(sync_handle->terminate)
                pthread_exit(0);
            if(!WorkerThread_do_job(sync_handle, worker_thread->ID))
                break;
        }
    }
}

typedef struct{
    pthread_t* thread_handles;
    WorkerThread* worker_threads;
    uint worker_threads_count;

    SynchronizationHandle* sync_handle;
    pthread_mutex_t barrier_mutexes[2];
    uint barrier_mutex_cursor;
} ThreadPool;

static void ThreadPool_wakeup_workers(ThreadPool* pool){
    // ASSUMPTION: thread executing already owns pool->barrier_mutexes + pool->barrier_mutex_cursor

    // compute which mutex to switch to next
    uint offset = (pool->barrier_mutex_cursor + 1)%2;

    // lock next mutex before wake up so that worker threads can't get hold of it before main thread
    pthread_mutex_lock(pool->barrier_mutexes + offset);

    // change adress to the next mutex before unlocking previous mutex, otherwise race condition
    pool->sync_handle->barrier_mutex = pool->barrier_mutexes + offset;

    // unlocking the previous mutex "wakes up" the worker threads since they are trying to lock it
    // hence why the assumption needs to hold
    pthread_mutex_unlock(pool->barrier_mutexes + pool->barrier_mutex_cursor);
    
    pool->barrier_mutex_cursor = offset;
}

I would expect that storing the pointer in a local variable guarantees that the optimizing compiler locks on the mutex from the local variable and not for whatever reason uses the pointer from the handle. Is this expectation correct?

As I explain in the question I linked to, the expected behaviour happens in 999 times out of 1000 procedure calls. But every once in a while I get threads that don't seem to execute their jobs while still emptying the queue and signaling the main thread that their work is done. And the only non-determinism in the whole process is the OS scheduler, so I have no idea why this error happens so rarely.

Any help is much appreciated, thank you.

Upvotes: 0

Views: 141

Answers (3)

Craig Estey
Craig Estey

Reputation: 33601

You don't need a separate "sleep" mode.

Telling a thread to "stop" by doing signal is clunky. Better to send a message with a "stop" flag in it. This allows for graceful shutdown (without the race condition issues with the signal).


To prevent threads from going into "spin" loops that burn CPU, we want a thread to sleep and wake up at just the right time for its next work request.

While we can use condition variables (e.g. pthread_cond_wait/pthread_cond_signal), there are performance issues.

Sending a pthread_cond_signal will only wake up one thread. And, it may be the "wrong" thread (or always the same one). So, we really want to do a pthread_cond_broadcast and wake up all sleeping threads.

But, if the broadcast is sent just before a thread goes to sleep, it will not signal it. And, another thread may have already "cleared" the condition variable. So, the second thread will have "missed" the wakeup.

And, with pthread_cond_broadcast, it wakes up all sleeping threads, even though only one may "get" the request block [on a queue]. The others will go back to sleep, having been woken up needlessly.

In other words, the woken up threads will race to grab the next element from the queue that the condition variable is used for.


This may be okay if each worker thread has its own private [per-thread] work request queue.

But, having a separate queue for each thread can result in a request waiting for a long time if the queue its waiting on is for a thread that is currently working on a request that is taking an extended amount of time to complete.

So, the pending request just sits there, on the queue, even if other worker threads are idle.

One way to solve this is to allow "work stealing" by other threads. That is, if their own queue is empty, they go looking at the request queues of other workers. They, then, "steal" the work by dequeuing it from another thread's queue.

That is a known, valid way to do it. But, a much simpler way is to have all threads use a single work request queue. It has less overhead than the work stealing approach and can be more robust.


What I've found, in practice (on a commercial grade shipping product), is to use IPC message queues [instead of condition variables].

I use these in conjunction with mutex protected doubly linked lists of "work request" structs.

They work cleanly with multiple producers and multiple consumers.

Although things have changed a lot, when I first started using IPC messages, circa 2004, msgrcv was the only mechanism that guaranteed that the system would wake up the thread and do an immediate reschedule so that the receiver would be run ASAP.

Here is a simplified [and working] example of what I mean:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdint.h>
#include <stdatomic.h>
#include <pthread.h>
#include <time.h>
#include <stdarg.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define sysfault(_fmt...) \
    do { \
        _dbgprt(_fmt); \
        _sysfault(); \
    } while (0)

#if DEBUG
#define dbgprt(_fmt...) \
    _dbgprt(_fmt)
#else
#define dbgprt(_fmt...) \
    do { } while (0)
#endif

#define inline_always   static inline __attribute__((__always_inline__))
#define inline_never    __attribute__((__noinline__))

enum worktype {
    WORK_SUM,
    WORK_AVG,
    WORK_FACT,
    WORK_STOP,
};
#define WORKTYPEMAX     (WORK_STOP + 1)

typedef unsigned int u32;
typedef long long s64;
typedef unsigned long long u64;

// work descriptor
typedef struct work {
    struct work *work_prev;                 // pointer to previous entry
    struct work *work_next;                 // pointer to next entry
    int work_bufno;                         // buffer number

    enum worktype work_type;                // type of work

    void *work_data;                        // pointer to work data
    u32 work_count;                         // count of work data

    u32 work_xid;                           // which task performed work
    u64 work_result;                        // work result
} work_t;

// work queue
typedef struct queue {
    struct queue *que_next;                 // pointer to next queue
    const char *que_sym;                    // queue name

    work_t *que_head;                       // head of work list
    work_t *que_tail;                       // tail of work list
    int que_count;                          // number of elements in queue

    pthread_mutex_t que_mutex;              // lock
    long que_msqid;                         // msgsnd/msgrcv msqid

    int que_stop;                           // 1=stop
} queue_t;

queue_t *queue_list;                        // list of queues
queue_t queue_free;                         // free queue
queue_t queue_active;                       // active queue
queue_t queue_result;                       // result queue
int quefd;                                  // message queue identifier
int questop;                                // 1=queues requested to stop

// task control
typedef struct {
    pthread_t tsk_tid;                      // task thread id
    u32 tsk_xid;                            // task id
    void *(*tsk_fnc)(void *);               // task function

    FILE *tsk_xfdbg;                        // debug

    u32 tsk_rand;                           // random number seed

    int tsk_stop;                           // 1=task got stop request
    int tsk_done;                           // 1=task done
} tsk_t;

tsk_t *tsklist;                             // list of tasks
tsk_t *tskm;                                // master task
__thread tsk_t *tskcur;                     // current task

#define TSKFORALL(_tsk) \
    tsk_t *_tsk = &tsklist[0];  _tsk < &tsklist[opt_T];  ++_tsk

// msgsnd/msgrcv message
typedef struct {
    long xmsg_msqid;                        // msgsnd/msgrcv msqid
    u32 xmsg_xid;                           // message sender task id
    work_t *xmsg_work;                      // pointer to work
    int xmsg_stop;                          // 1=stop queue request
} xmsg_t;

const char *opt_L;                          // output trace directory
int opt_M;                                  // number of worker message structs
int opt_N;                                  // number of messages to send
int opt_T;                                  // number of workers

void *worker_thread(void *vptr);

double tsczero;

double
tscgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_MONOTONIC,&ts);

    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tsczero;

    return sec;
}

// _sysfault -- terminate program abnormally
inline_never void
_sysfault(void)
{
        exit(1);
}

inline_never void
_dbgprt(const char *fmt,...)
{
    va_list ap;
    char buf[1000];
    char *bp = buf;

    double tsc = tscgetf();
    bp += sprintf(bp,"[%.9f] t%4.4u ",tsc,tskcur->tsk_xid);

    va_start(ap,fmt);
    bp += vsprintf(bp,fmt,ap);
    va_end(ap);

    FILE *xferr = NULL;
    if (tskcur != NULL)
        xferr = tskcur->tsk_xfdbg;
    if (xferr == NULL)
        xferr = stderr;

    // send line atomically
    fputs(buf,xferr);
}

inline_always int
workbufno(work_t *work)
{
    int bufno;

    if (work != NULL)
        bufno = work->work_bufno;
    else
        bufno = -1;

    return bufno;
}

#define QDS(_sym,_reason) \
    _dbgprt("xmsgstat: " #_sym " %lld -- " _reason "\n", \
        (s64) ds._sym)

#define QDS_TOD(_sym,_reason) \
    _dbgprt("xmsgstat: " #_sym " %.9f -- " _reason "\n", \
        (double) ds._sym - tsczero)

#define QINFO(_sym,_reason) \
    _dbgprt("xmsgstat: " #_sym " %lld -- " _reason "\n", \
        (s64) info._sym)

// xmsgstat -- show state
void
xmsgstat(int full,const char *who)
{

    _dbgprt("xmsgstat: ENTER (from %s)\n",who);

    struct msqid_ds ds;
    msgctl(quefd,IPC_STAT,&ds);

    QDS_TOD(msg_stime,"time of last msgsnd");
    QDS_TOD(msg_rtime,"time of last msgrcv");
    QDS_TOD(msg_ctime,"time of last change");
    QDS(__msg_cbytes,"current number of bytes in queue");
    QDS(msg_qnum,"current number of messages in queue");
    QDS(msg_qbytes,"maximum number of bytes allowed in queue");

    if (full) {
#if 1
        size_t maxmsg = ds.msg_qbytes / sizeof(xmsg_t);
        if (opt_M > maxmsg)
            opt_M = maxmsg;
#endif

        struct msginfo info;
        msgctl(quefd,MSG_INFO,(void *) &info);
        QINFO(msgpool,"size in KB of buffer pool used to hold message data");
        QINFO(msgmap,"max number of entries in message map (unused)");
        QINFO(msgmax,"max number of bytes in a single message");
        QINFO(msgmnb,"max number of bytes that can be written to queue");
        QINFO(msgmni,"max number of message queues");
        QINFO(msgssz,"message segment size (unused)");
        QINFO(msgtql,"max number of messages on all queues in system (unused)");
        QINFO(msgseg,"max number of segments (unused)");
    }

    _dbgprt("xmsgstat: EXIT\n");
}

// queput -- send work
void
queput(queue_t *que,work_t *work)
{
    xmsg_t xmsg = { 0 };

    dbgprt("queput: ENTER que_sym=%s work_bufno=%u\n",
        que->que_sym,workbufno(work));

    // identify who sent the message
    xmsg.xmsg_xid = tskcur->tsk_xid;
    work->work_xid = tskcur->tsk_xid;

    pthread_mutex_lock(&que->que_mutex);

    work_t *tail = que->que_tail;
    work_t *head = que->que_head;

    dbgprt("queput: QPTRS que_head=%d que_tail=%d que_count=%d que_msqid=%ld quefd=%d\n",
        workbufno(head),workbufno(tail),que->que_count,que->que_msqid,quefd);

    work->work_prev = tail;
    if (tail != NULL)
        tail->work_next = work;
    que->que_tail = work;

    work->work_next = NULL;
    if (head == NULL)
        que->que_head = work;

    que->que_count += 1;

    pthread_mutex_unlock(&que->que_mutex);

    do {
        xmsg.xmsg_msqid = atomic_load(&que->que_msqid);
        if (xmsg.xmsg_msqid <= 0)
            break;

        // send message
        dbgprt("queput: MSGSND\n");
        int err = msgsnd(quefd,&xmsg,sizeof(xmsg_t) - sizeof(long),0);
        if (err < 0)
            sysfault("queput: msgsnd fault -- %s\n",strerror(errno));
    } while (0);

    dbgprt("queput: EXIT\n");
}

// queget -- dequeue next work
work_t *
queget(queue_t *que,int flags)
{
    int err;
    xmsg_t xmsg;
    work_t *work;

    dbgprt("queget: ENTER que_sym='%s' que_msqid=%ld flags=%8.8X\n",
        que->que_sym,que->que_msqid,flags);

    // wait for message
    do {
        int msqid = atomic_load(&que->que_msqid);
        if (msqid <= 0)
            break;

        // already received stop message
        if (tskcur->tsk_stop) {
            dbgprt("queget: TSKSTOP/ALREADY\n");
            break;
        }

#if 1
        xmsg.xmsg_msqid = msqid;
#endif

        dbgprt("queget: MSGRCV\n");
        err = msgrcv(quefd,&xmsg,sizeof(xmsg_t) - sizeof(long),msqid,flags);
        if (err < 0) {
            if (errno == ENOMSG)
                break;
            sysfault("queget: msgrcv fault -- %s\n",strerror(errno));
        }

        // process stop message
        if (xmsg.xmsg_stop) {
            dbgprt("queget: TSKSTOP/NOW\n");
            tskcur->tsk_stop = 1;
        }
    } while (0);

    pthread_mutex_lock(&que->que_mutex);

    do {
        work = que->que_head;

        // empty queue
        if (work == NULL)
            break;

        work_t *next = work->work_next;
        que->que_head = next;
        if (next != NULL)
            next->work_prev = NULL;

        if (que->que_tail == work)
            que->que_tail = NULL;

        que->que_count -= 1;
    } while (0);

    pthread_mutex_unlock(&que->que_mutex);

    dbgprt("queget: EXIT work=%d\n",workbufno(work));

    return work;
}

void
queinit(queue_t *que,const char *sym,long msqid)
{

    dbgprt("queinit: ENTER sym='%s' msqid=%ld\n",sym,msqid);

    pthread_mutex_init(&que->que_mutex,NULL);
    que->que_sym = sym;
    que->que_msqid = msqid;

    que->que_next = queue_list;
    queue_list = que;

    dbgprt("queinit: EXIT\n");
}

// queinitall -- initialize message queues
void
queinitall(void)
{

    dbgprt("queinitall: ENTER\n");

    quefd = msgget(IPC_PRIVATE,IPC_CREAT | 0600);
    if (quefd < 0)
        sysfault("queinit: msgget fault -- %s\n",strerror(errno));

    xmsgstat(1,"queinit");

    queinit(&queue_result,"queue_result",2);
    queinit(&queue_active,"queue_active",1);
    queinit(&queue_free,"queue_free",0);

    work_t *work;
    for (int idx = 0;  idx < opt_M;  ++idx) {
        work = calloc(1,sizeof(*work));
        work->work_bufno = idx;
        queput(&queue_free,work);
    }

    dbgprt("queinitall: EXIT\n");
}

// questopall -- tell all receivers/workers to stop cleanly
void
questopall(queue_t *que)
{
    int err;
    xmsg_t xmsg;

    dbgprt("questopall: ENTER que_sym=%s\n",que->que_sym);

    atomic_store(&questop,1);

    for (TSKFORALL(tsk)) {
        memset(&xmsg,0,sizeof(xmsg));

        xmsg.xmsg_stop = 1;
        xmsg.xmsg_msqid = que->que_msqid;

        // send message
        err = msgsnd(quefd,&xmsg,sizeof(xmsg_t) - sizeof(long),0);
        if (err < 0)
            sysfault("queput: msgsnd fault -- %s\n",strerror(errno));
    } while (0);

    dbgprt("questopall: EXIT\n");
}

// tskstart -- start task
void *
tskstart(void *ptr)
{
    tsk_t *tsk = ptr;

    tskcur = tsk;

    ptr = tsk->tsk_fnc(ptr);

    if (tsk->tsk_xfdbg != NULL) {
        fclose(tsk->tsk_xfdbg);
        tsk->tsk_xfdbg = NULL;
    }

    // say we're done
    tskcur->tsk_done = 1;

    return ptr;
}

// tskinit -- initialize
void
tskinit(tsk_t *tsk,u32 xid)
{
    char file[1000];

    tsk->tsk_xid = xid;
    tsk->tsk_rand = rand();

    if (opt_L != NULL) {
        sprintf(file,"%s/tsklog_%4.4d.txt",opt_L,tsk->tsk_xid);
        tsk->tsk_xfdbg = fopen(file,"w");
        if (tsk->tsk_xfdbg != NULL)
            setlinebuf(tsk->tsk_xfdbg);
    }
}

// tskinitall -- initialize all tasks
void
tskinitall(void)
{

    tsklist = calloc(opt_T + 1,sizeof(tsk_t));

    u32 xid = 0;

    tskm = calloc(1,sizeof(tsk_t));
    tskcur = tskm;

    tskinit(tskm,xid++);

    for (TSKFORALL(tsk), ++xid)
        tskinit(tsk,xid);
}

// tsklaunch -- create new task
void
tsklaunch(tsk_t *tsk,void *(*fnc)(void *))
{

    dbgprt("tsklaunch: ENTER tsk_xid=%u\n",tsk->tsk_xid);

    if (tsk->tsk_fnc == NULL)
        tsk->tsk_fnc = fnc;

    pthread_create(&tsk->tsk_tid,NULL,tskstart,tsk);

    dbgprt("tsklaunch: EXIT tsk_xid=%u\n",tsk->tsk_xid);
}

// tsklaunchall -- create/launch all tasks
void
tsklaunchall(void *(*fnc)(void *))
{

    for (TSKFORALL(tsk))
        tsklaunch(tsk,fnc);
}

// tskjoinall -- initialize
void
tskjoinall(void)
{

    dbgprt("tskjoinall: ENTER\n");

    for (TSKFORALL(tsk)) {
        dbgprt("tskjoinall: LOOP tsk_xid=%u\n",tsk->tsk_xid);
        pthread_join(tsk->tsk_tid,NULL);
        if (tsk->tsk_xfdbg != NULL)
            fclose(tsk->tsk_xfdbg);
    }

    dbgprt("tskjoinall: EXIT\n");
}

// worker_thread -- sample worker thread function
void *
worker_thread(void *vptr)
{

    while (1) {
        // get new work to do
        work_t *work = queget(&queue_active,0);

        // process work
        if (work != NULL) {
            switch (work->work_type) {
            case WORK_SUM:
            case WORK_AVG:
            case WORK_FACT:
                work->work_result = rand_r(&tskcur->tsk_rand);
                break;
            case WORK_STOP:
                break;
            }

            // send back results
            queput(&queue_result,work);
        }

        // handle stop requeust
        if (tskcur->tsk_stop) {
            dbgprt("worker_thread: TSKSTOP/FINAL\n");
            break;
        }
    }

    return (void *) 0;
}

// workresult -- get results from workers
void
workresult(int flags)
{

    dbgprt("workresult: ENTER flags=%8.8X\n",flags);

    while (queue_free.que_count < opt_M) {
        dbgprt("workresult: LOOP queue_active=%d queue_result=%d queue_free=%d\n",
            queue_active.que_count,queue_result.que_count,queue_free.que_count);

        work_t *work = queget(&queue_result,flags);
        if (work == NULL)
            break;

        _dbgprt("master: RESULT work_type=%u work_result=%llu work_xid=%u\n",
            work->work_type,work->work_result,work->work_xid);

        // put request block on free queue
        queput(&queue_free,work);
    }

    dbgprt("workresult: EXIT\n");
}

int
main(int argc,char **argv)
{

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        char *cp = *argv;
        if (*cp != '-')
            break;

        cp += 2;
        switch (cp[-1]) {
        case 'L':
            opt_L = (*cp != 0) ? cp : ".";
            break;

        case 'M':
            opt_M = (*cp != 0) ? atoi(cp) : 1;
            break;

        case 'N':
            opt_N = (*cp != 0) ? atoi(cp) : 1;
            break;

        case 'T':
            opt_T = (*cp != 0) ? atoi(cp) : 1;
            break;
        }
    }

    setlinebuf(stdout);
    setlinebuf(stderr);

    tsczero = tscgetf();

    if (opt_T < 1)
        opt_T = 1;

    if (opt_M < opt_T)
        opt_M = opt_T;

    if (opt_N < 0)
        opt_N = 0;

    printf("T=%d M=%d N=%d\n",opt_T,opt_M,opt_N);

    if (opt_L != NULL) {
        char cmd[1000];
        sprintf(cmd,"mkdir -p %s",opt_L);
        system(cmd);
    }

    // initialize all task blocks
    tskinitall();

    // initialize all queues
    queinitall();

    // start all threads
    tsklaunchall(worker_thread);

    for (int workidx = 0;  workidx < opt_N;  ++workidx) {
        dbgprt("main: LOOP workidx=%d\n",workidx);

        work_t *work;
        while (1) {
            workresult(IPC_NOWAIT);
            work = queget(&queue_free,0);
            if (work != NULL)
                break;
        }

        // create the work request
        work->work_type = rand_r(&tskm->tsk_rand) % WORKTYPEMAX;
        work->work_count = rand_r(&tskm->tsk_rand) % 10000;

        // send it to the workers
        queput(&queue_active,work);
    }

    // wait for all requests to be processed
    dbgprt("master: FINAL\n");
    while (queue_free.que_count < opt_M) {
        dbgprt("master: FINLOOP que_count=%d opt_M=%d\n",
            queue_free.que_count,opt_M);
        workresult(0);
    }

    // send stop requests
    questopall(&queue_active);

    // join all tasks
    tskjoinall();

    return 0;
}

Upvotes: 1

Luis Colorado
Luis Colorado

Reputation: 12668

what you need is a cond variable to create a blocking queue, where the threads will wait until there's some work to do... once served the queue, the treads go to the queue, and sleep on the cond variable until a new assignment is done by the main thread. Cond variables are used normally using an auxiliary mutex (the one you use can be appropiate for that) and they check for the condition to be true, while waiting on the mutex. You need to use the mutex to lock the access to the condition, and while waiting on the condition, the lock is released to allow other threads to acces.

Everytime the condition is updated, the process that makes the update (that also uses the same mutex to access the condition) calls signal to signal (and awake) all processes waiting for the condition, to reaquire the mutex (this happens atomically) and so, be able to check the condition again (the condition must be rechecked, because other thread can have been awaken before, checked the condition, made it false again an go to serve the service again, releasing the lock until it finishes.....

in pseudocode, it should be something like:

threads that wait for the queue to have new jobs

    mutex_lock();
    while (queue_empty()) { /* this is the condition, in your case */
        wait_on_condition();   /* this releases the mutex lock and reaquires
                                * it again, once the thread is awakened and
                                * able to retest the condition */
    }
    get_task_from_the_queue(); /* this can (depending on how full
                                *  is the queue) make the condition to
                                * fail to another thread */
    mutex_unlock();

The process that inputs the tasks to the queue, should do the following:

    lock_mutex();
    put_task_on_queue();  /* this should make the queue non empty and */
    signal_all_threads_waiting_in_cond();
    unlock_mutex();

there's the necessity (sometimes) to awake all the processes waiting for the queue (this case is very simple, but the condition can be complex and make that the awakened thread is not able to accept the task, so the task will remain on the queue without the thread gaining the lock to be able to get it. For this reason, it is better to awaken all the processes waiting and let the winner to get the task.

Some implementers write instead:

    mutex_lock();
    if (queue_empty()) { /* BAD BAD BAD BAD BAD */
        wait_on_condition();
    }
    get_task_from_the_queue(); /* incorrect, you don't know if the queue is
                                * empty here */
    mutex_unlock();

because after waiting for the resource, the condition is not checked again, and you will end two processes getting the same task from the queue (probably corrupting the queue, or missing a task)

Upvotes: 2

John Bollinger
John Bollinger

Reputation: 180316

I would expect that storing the pointer in a local variable guarantees that the optimizing compiler locks on the mutex from the local variable and not for whatever reason uses the pointer from the handle. Is this expectation correct?

The expectation is correct, provided that the program has no undefined behavior. Among the most significant potential sources of undefined behavior in a multithreaded program are data races, which occur when two different threads both access the same object (the pointer object designated by sync_handle->barrier_mutex, say), at least one of the accesses is a write, and there is a possible execution of the program in which no synchronization action creates a "happens-before" relationship between the accesses. The code presented appears prone to exactly such a data race.

One cannot reason from the C language specification about what a program actually will do under such circumstances, as that's exactly what "undefined behavior" means. However, in practice, a more likely manifestation than the program reading the value of a different object than you intended would be that the racy read results in a different value than you expected.

Upvotes: 1

Related Questions