Manuel Selva
Manuel Selva

Reputation: 19050

How to properly suspend threads?

In the context of an existing multithreaded application I want to modify it to be able to suspend threads. The application is made of 3 working threads that work in "lock steps" using a pthread_barrier as following :

Thread 1   Thread 2   Thread 3

while(1)   while(1)    while(1)
   |          |          |
   |          |          |
   |          |          |
   |          |          |
 barrier     barrier   barrier

All is fine with this code. I now add a 4th thread used to control the 3 other ones, and I need to suspend/resume the 3 working threads from it. For now I tried to use a global stop flag and a condition variable written by the control thread and read by the working thread after the barrier.

Thread 1     Thread 2       Thread 3        Thread 4

while(1)     while(1)        while(1)        wait for user input to suspend 
   |            |              |             mutex_lock
   |            |              |             stop = 1
   |            |              |             mutex_unlock
   |            |              |             wait for user input to resume 
   |            |              |             mutex_lock
   |            |              |             stop = 0
   |            |              |             cond_broadcast()
   |            |              |             mutex_unlock
 barrier       barrier       barrier
 mutex_lock    mutex_lock    mutex_lock
 if(stop)      if(stop)      if(stop)
  cond_wait()   cond_wait()   cond_wait()
 mutex_unlock  mutex_unlock  mutex_unlock

The problem with this solution is that it can sometimes deadlock depending on the scheduling of the threads and the working length of threads 1, 2 and 3. I am thus wondering how can I successfully synchronize these 4 threads to be able to suspend/resume the working threads from the control one ?

Upvotes: 2

Views: 2375

Answers (3)

Nominal Animal
Nominal Animal

Reputation: 39298

I believe gmch's answer should solve the original question. However, not all pthread implementations include pthread_barrier_t and the related functions (as they are an optional part of the POSIX threads specs), so here is the custom barrier implementation I mentioned in a comment to the original question.

(Note that there are other ways to suspend/resume threads asynchronously, during normal operation, and without co-operation from the threads themselves. One way to implement that is to use one or two realtime signals, and a signal handler that blocks in sigsuspend(), waiting for the complementary "continue" signal. The controlling thread will have to use pthread_kill() or pthread_sigqueue() to send the pausing and continuing signals to each thread involved. The threads are minimally affected; aside from possible EINTR errors from blocking syscalls (as signal delivery interrupts blocking syscalls), the threads just don't do any progress -- just as if they weren't scheduled for a while. Because of that, there should not be any issues with respect to the threads getting paused and continued at slightly different times. If you are interested in this method, leave a comment, and I could try and show an example implementation of that, too.)

Perhaps this will be of use to someone else needing a pause-able custom barrier implementation, or perhaps as a basis of their own custom barrier.

Edited to add DRAINING mode, when threads are expected to quit. In your worker loop, use do { ... } while (!barrier_wait(&barrier));

barrier.h:

#ifndef   BARRIER_H
#define   BARRIER_H
#include <pthread.h>
#include <errno.h>

typedef enum {
    INVALID = -1,
    RUNNING = 0,
    PAUSED = 1,
    DRAINING = 2
} barrier_state;

typedef struct {
    pthread_mutex_t     mutex;
    pthread_cond_t      cond;
    barrier_state       state;
    int                 threads;    /* Number of participants */
    int                 waiting;    /* Number of participants waiting */
} barrier;

/** barrier_drain() - Mark barrier so that threads will know to exit
 * @b: pointer to barrier
 * @ids: pthread_t's for the threads to wait on, or NULL
 * @retvals: return values from the threads, or NULL
 * This function marks the barrier such that all threads arriving
 * at it will return ETIMEDOUT.
 * If @ids is specified, the threads will be joined.
 * Returns 0 if successful, errno error code otherwise.
*/
static int barrier_drain(barrier *const b, pthread_t *const ids, void **const retvals)
{
    int   result, threads;
    void *retval;

    if (!b || b->threads < 0)
        return errno = EINVAL;

    result = pthread_mutex_lock(&b->mutex);
    if (result)
        return errno = result;

    b->state = DRAINING;
    pthread_cond_broadcast(&b->cond);

    threads = b->threads;
    b->threads = 0;

    pthread_mutex_unlock(&b->mutex);

    while (threads-->0) {
        result = pthread_join(ids[threads], &retval);
        if (result)
            return errno = result;
        if (retvals)
            retvals[threads] = retval;
    }

    return errno = 0;
}            

/** barrier_pause() - Mark barrier to pause threads in the barrier
 * @b: pointer to barrier
 * This function marks the barrier such that all threads arriving
 * in it will wait in the barrier, until barrier_continue() is
 * called on it. If barrier_continue() is called before all threads
 * have arrived on the barrier, the barrier will operate normally;
 * i.e. the threads will continue only when all threads have arrived
 * at the barrier.
 * Returns 0 if successful, errno error code otherwise.
*/
static int barrier_pause(barrier *const b)
{
    int result;

    if (!b || b->threads < 1)
        return errno = EINVAL;

    result = pthread_mutex_lock(&b->mutex);
    if (result)
        return errno = result;

    if (b->state != PAUSED && b->state != RUNNING) {
        pthread_mutex_unlock(&b->mutex);
        return errno = EPERM;
    }

    b->state = PAUSED;
    pthread_mutex_unlock(&b->mutex);
    return errno = 0;
}

/** barrier_continue() - Unpause barrier
 * @b: Pointer to barrier
 * This function lets the barrier operate normally.
 * If all threads are already waiting in the barrier,
 * it lets them proceed immediately. Otherwise, the
 * threads will continue when all threads have arrived
 * at the barrier.
 * Returns 0 if success, errno error code otherwise.
*/
static int barrier_continue(barrier *const b)
{
    int result;

    if (!b || b->threads < 0)
        return errno = EINVAL;

    result = pthread_mutex_lock(&b->mutex);
    if (result)
        return errno = result;

    if (b->state != PAUSED) {
        pthread_mutex_unlock(&b->mutex);
        return errno = EPERM;
    }

    b->state = RUNNING;

    if (b->waiting >= b->threads)
        pthread_cond_broadcast(&b->cond);

    pthread_mutex_unlock(&b->mutex);

    return errno = 0;
}

/** barrier_wait() - Wait on the barrier
 * @b: Pointer to barrier
 * Each thread participating in the barrier
 * must call this function.
 * Callers will block (wait) in this function,
 * until all threads have arrived.
 * If the barrier is paused, the threads will
 * wait until barrier_continue() is called on
 * the barrier, otherwise they will continue
 * when the final thread arrives to the barrier.
 * Returns 0 if success, errno error code otherwise.
 * Returns ETIMEDOUT if the thread should exit.
*/
static int barrier_wait(barrier *const b)
{
    int result;

    if (!b || b->threads < 0)
        return errno = EINVAL;

    result = pthread_mutex_lock(&b->mutex);
    if (result)
        return errno =result;

    if (b->state == INVALID) {
        pthread_mutex_unlock(&b->mutex);
        return errno = EPERM;
    } else
    if (b->state == DRAINING) {
        pthread_mutex_unlock(&b->mutex);
        return errno = ETIMEDOUT;
    }

    b->waiting++;

    if (b->state == RUNNING && b->waiting >= b->threads)
        pthread_cond_broadcast(&b->cond);
    else
        pthread_cond_wait(&b->cond, &b->mutex);

    b->waiting--;
    pthread_mutex_unlock(&b->mutex);

    return errno = 0;
}

/** barrier_destroy() - Destroy a previously initialized barrier
 * @b: Pointer to barrier
 * Returns zero if success, errno error code otherwise.
*/
static int barrier_destroy(barrier *const b)
{
    int result;

    if (!b || b->threads < 0)
        return errno = EINVAL;

    b->state = INVALID;
    b->threads = -1;
    b->waiting = -1;

    result = pthread_cond_destroy(&b->cond);
    if (result)
        return errno = result;

    result = pthread_mutex_destroy(&b->mutex);
    if (result)
        return errno = result;

    return errno = 0;
}

/** barrier_init() - Initialize a barrier
 * @b: Pointer to barrier
 * @threads: Number of threads to participate in barrier
 * Returns 0 if success, errno error code otherwise.
*/
static int barrier_init(barrier *const b, const int threads)
{
    int result;

    if (!b || threads < 1)
        return errno = EINVAL;

    result = pthread_mutex_init(&b->mutex, NULL);
    if (result)
        return errno = result;

    result = pthread_cond_init(&b->cond, NULL);
    if (result)
        return errno = result;

    b->state = RUNNING;
    b->threads = threads;
    b->waiting = 0;

    return errno = 0;
}

#endif /* BARRIER_H */

The logic is quite simple. All threads waiting in the barrier wait on the cond condition variable. If the barrier operates normally (state==RUNNING), the final thread arriving at the barrier will broadcast on the condition variable instead of waiting on it, thus waking up all other threads.

If the barrier is paused (state==PAUSED), even the final thread arriving at the barrier will wait on the condition variable.

When barrier_pause() is called, the barrier state is changed to paused. There may be zero or more threads waiting on the condition variable, and that is okay: only the final thread arriving at the barrier has a special role, and that thread cannot have yet arrived. (If it had, it'd have emptied the barrier already.)

When barrier_continue() is called, the barrier state is changed to normal (state==RUNNING). If all threads are waiting on the condition variable, they are released by broadcasting on the condition variable. Otherwise, the final thread arriving at the barrier will broadcast on the condition variable and release the waiting threads normally.

Note that barrier_pause() and barrier_continue() do not wait for the barrier to become full or to drain. It only blocks on the mutex, and the functions only hold it for very short periods at a time. (In other words, they may block for a short time, but will not wait for the barrier to reach any specific situation.)

If the barrier is draining (state==DRAINING), threads arriving at the barrier return immediately with errno==ETIMEDOUT. For simplicity, all the barrier functions now unconditionally set errno (to 0 if success, errno code if error, ETIMEDOUT if draining).

The mutex protects the barrier fields so that only one thread may access the fields at once. In particular, only one thread can arrive at the barrier at the same time, due to the mutex.

One complicated situation exists: the loop body the barrier is used in might be so short, or there might be so many threads, that threads start arriving at the next iteration of the barrier even before all threads from the previous iteration have left it.

According to POSIX.1-2004, pthread_cond_broadcast() "shall unblock all threads currently blocking on the specified condition variable". Even though their wakeups will be sequential -- as each one will acquire the mutex in turn --, only those threads that were blocked on it when pthread_cond_broadcast() was called will be woken up.

So, if the implementation follows POSIX semantics with respect to condition variables, woken threads can (even immediately!) re-wait on the condition variable, waiting for the next broadcast or signal: the "old" and "new" waiters are separate sets. This use case is actually quite typical, and all POSIX implementations I've heard of do allow that -- they do not wake up threads that started waiting on the condition variable after the last pthread_cond_broadcast().

If we can rely on POSIX condition variable wakeup semantics, it means the above barrier implementation should work reliably, including in the case where threads arrive at the barrier (for the next iteration), even before all threads (from the previous iteration) have left the barrier.

(Note that the known "spurious wakeups" issue only affects pthread_cond_signal(); i.e. when calling pthread_cond_signal() more than one thread may be woken up. Here, we wake up all threads using pthread_cond_broadcast(). We rely on it only waking current waiters, and not any future waiters.)


Here is a POSIX.1-2001 implementation for suspending and resuming threads asynchronously, without any co-operation from the target thread(s).

This uses two signals, one for suspending a thread, and another for resuming it. For maximum compatibility, I did not use GNU C extensions or POSIX.1b realtime signals. Both signals save and restore errno, so that the impact to the suspended threads would be minimal.

Note, however, that the functions listed in man 7 signal, "Interruption of system calls and library functions by signal handlers" section, after the "The following interfaces are never restarted after being interrupted by a signal handler" paragraph, will return errno==EINTR when suspended/resumed. This means you will have to use the traditional do { result = FUNCTION(...); } while (result == -1 && errno == EINTR); loop, instead of just result = FUNCTION(...);.

The suspend_threads() and resume_threads() calls are not synchronous. The threads will be suspended/resumed either before, or sometime after, the function calls return. Also, suspend and resume signals sent from outside the process itself may affect the threads; it depends on if the kernel uses one of the target threads to deliver such signals. (This approach cannot ignore signals sent by other processes.)

Testing indicates that in practice, this suspend/resume functionality is quite reliable, assuming no outside interference (by sending signals caught by the target threads from another process). However, it is not very robust, and there are very few guarantees on its operation, but it might suffice for some implementations.

suspend-resume.h:

#ifndef   SUSPEND_RESUME_H
#define   SUSPEND_RESUME_H

#if !defined(_POSIX_C_SOURCE) && !defined(POSIX_SOURCE)
#error This requires POSIX support (define _POSIX_C_SOURCE).
#endif

#include <signal.h>
#include <errno.h>
#include <pthread.h>

#define   SUSPEND_SIGNAL  SIGUSR1
#define   RESUME_SIGNAL   SIGUSR2

/* Resume signal handler.
*/
static void resume_handler(int signum, siginfo_t *info, void *context)
{
    /* The delivery of the resume signal is the key point.
     * The actual signal handler does nothing. */
    return;
}

/* Suspend signal handler.
*/
static void suspend_handler(int signum, siginfo_t *info, void *context)
{
    sigset_t  resumeset;
    int       saved_errno;

    if (!info || info->si_signo != SUSPEND_SIGNAL)
        return;

    /* Save errno to keep it unchanged in the interrupted thread. */
    saved_errno = errno;

    /* Block until suspend or resume signal received. */
    sigfillset(&resumeset);
    sigdelset(&resumeset, SUSPEND_SIGNAL);
    sigdelset(&resumeset, RESUME_SIGNAL);
    sigsuspend(&resumeset);

    /* Restore errno. */
    errno = saved_errno; 
}

/* Install signal handlers.
*/
static int init_suspend_resume(void)
{
    struct sigaction act;

    sigemptyset(&act.sa_mask);
    sigaddset(&act.sa_mask, SUSPEND_SIGNAL);
    sigaddset(&act.sa_mask, RESUME_SIGNAL);
    act.sa_flags = SA_RESTART | SA_SIGINFO;

    act.sa_sigaction = resume_handler;
    if (sigaction(RESUME_SIGNAL, &act, NULL))
        return errno;

    act.sa_sigaction = suspend_handler;
    if (sigaction(SUSPEND_SIGNAL, &act, NULL))
        return errno;

    return 0;
}

/* Suspend one or more threads.
*/
static int suspend_threads(const pthread_t *const identifier, const int count)
{
    int i, result, retval = 0;

    if (!identifier || count < 1)
        return errno = EINVAL;

    for (i = 0; i < count; i++) {
        result = pthread_kill(identifier[i], SUSPEND_SIGNAL);
        if (result && !retval)
            retval = result;
    }

    return errno = retval;
}

/* Resume one or more threads.
*/
static int resume_threads(const pthread_t *const identifier, const int count)
{
    int i, result, retval = 0;

    if (!identifier || count < 1)
        return errno = EINVAL;

    for (i = 0; i < count; i++) {
        result = pthread_kill(identifier[i], RESUME_SIGNAL);
        if (result && !retval)
            retval = result;
    }

    return errno = retval;
}

#endif /* SUSPEND_RESUME_H */

Questions?

Upvotes: 1

user3793679
user3793679

Reputation:

To keep the threads in sync, you should put the test for stop before the barrier. If the flag is set while one or more of the working threads have reached the barrier, then they are held until the other(s) are released from the condition.


Added following the exchange of comments below...

With the check of the stop flag after the barrier, there is a race. Immediately after the barrier, the working threads check the flag in turn. If the flag is set just after one or more threads have checked it, but before the next one(s), some threads will miss the stop, and continue round to the barrier -- so the working threads are now out of sync.

With the check of the stop flag before the barrier, there is still a race, but it does not cause the working threads to get out of sync. If the flag is set just after one or more threads have checked it, the ones that miss it proceed, and stop at the barrier. Any threads that see the stop flag will stop on the condition, and when the condition is signalled, they will proceed to the barrier and all the working threads continue in sync.

To put this another way... with the check after the barrier, all the working threads need to see the same state of the stop flag after they are brought into sync by the barrier, if they are to stay in sync -- which is impossible. With the check before the barrier, only one working thread needs to see the stop flag in order to stop them in sync -- which is straightforward.

From the sketch of the code show, it's not clear why there would be a deadlock. Moving the check round does not change that, but perhaps the reported deadlock is caused by the working threads getting out of sync.


Separately and FWIW, generally one would write:

while (...reason to wait...)
  pthread_cond_wait(...) ;

Rather than:

if (...reason to wait...)
  pthread_cond_wait(...) ;

This is mostly because pthread_cond_signal() can (the standard allows it to) wake up more than one thread, and in this case pthread_cond_broadcast is being used... but an if keeps ringing alarm bells.

Upvotes: 1

skuanr
skuanr

Reputation: 48

You Can use signal handler to suspend and resume a thread depending on which signal is delivered to the thread. Write two custom signal handlers: one for suspend(SIGUSR1) and resume(SIGUSR2). So when you want to suspend a thread just send SIGUSR1 signal to that thread. Similarly for resuming a suspended thread send SIGUSR2 to that thread using pthread_kill.

Upvotes: 0

Related Questions