Reputation: 932
The below program processes nbFiles
files using 1 worker thread per GROUPSIZE
files. No more than MAXNBRTHREADS
worker threads are run in parallel. A watchDog()
thread (thread 0) is used to shepherd the PTHREAD_CANCEL_DEFERRED
identical workers. If anyone of the workers fails, it pthread_cond_signal(&errCv)
the watchDog
under the protection of global mutex mtx
, passing its thread ID via the errIndc
predicate. watchDog
then cancels all running threads (global oldest
maintains the ID of the oldest thread still alive to help it do this), and exits the program.
// compile with: gcc -Wall -Wextra -Wconversion -pedantic -std=c99 -g -D_BSD_SOURCE -pthread -o pFiles pFiles.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <stdint.h>
#include "pthread.h"
#define INDIC_ALL_DONE_OK -1
typedef int_fast32_t int32;
typedef uint_fast32_t uint32;
uint32 MAXNBRTHREADS = 10; // no more than this amount of threads running in parallel
uint32 GROUPSIZE = 10; // how many files per thread
uint32 nbFiles, gThID; // total #files, group ID for a starting thread
int32 errIndc = 0; // global thread error indicator
pthread_t *thT; // pthread table
void **retVals; // thread ret. val. table, needed in stop_watchDog()
uint32 gThCnt; // calculated size of thT[]
uint32 thCnt, oldest; // running threads count (as they are created), oldest thread *alive*
pthread_cond_t errCv = PTHREAD_COND_INITIALIZER; // thread-originated error signal
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; // mutex to protect errIndc
// Worker thread
void *processFileGroup(void *arg) {
int32 err;
int last_state, last_type;
uint32 i, thId = (uint32)(intptr_t) arg;
fprintf(stderr, "th %ld started\n", thId);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &last_state);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_type);
// Artificial error in thread 17
if(thId==17) {
pthread_mutex_lock(&mtx);
errIndc = (int32) thId;
pthread_cond_signal(&errCv);
pthread_mutex_unlock(&mtx);
pthread_exit((void *)(intptr_t)err); }
for(i = 0; i < GROUPSIZE ; i++) { // simulate processing GROUPSIZE files
pthread_testcancel();
err = 0;
if(usleep(10000)) { err = 1; break; }
}
//fprintf(stderr, " -- th %ld done with err = %ld\n", thId, err);
if(err!=0) { // Signal watch dog
pthread_mutex_lock(&mtx);
errIndc = (int32) thId;
pthread_cond_signal(&errCv);
pthread_mutex_unlock(&mtx);
pthread_exit((void *)(intptr_t) err);
}
pthread_exit((void *)(intptr_t) err);
}
// Mishap : cancel existing threads, exit program
int32 cancel_exit(int32 rc, int32 faultyThId, char *msg) {
uint32 j; int32 rval;
void *retVal;
if(rc==0) return 0;
if(msg!=NULL && msg[0]=='\0') fprintf(stderr, "\nError in thread %ld. Stoping..\n", faultyThId);
else fprintf(stderr, "\n%s %ld. Stop.\n\n", msg, faultyThId);
for(j = oldest; j < thCnt ; j++) pthread_cancel(thT[j]);
for(j = oldest; j < thCnt ; j++){
pthread_join(thT[j], &retVal); rval = (int)(intptr_t) retVal;
//if(retVal == PTHREAD_CANCELED || rval==115390242)
if(retVal == PTHREAD_CANCELED)
fprintf(stderr, " cexit: thread %ld canceled\n", j);
else fprintf(stderr, " cexit: thread %ld finished, rc = %ld\n", j, rval);
}
pthread_join(thT[4], &retVal); rval = (int)(intptr_t) retVal; fprintf(stderr, " cexit1: thread 4 finished, rc = %ld\n", rval);
fprintf(stderr, "Processing stopped\n\n");
exit(EXIT_FAILURE); return rc;
}
// Watch dog thread
// it fires on signal from one of the running threads about a mishap
void *watchDog(void *arg) {
int32 err;
pthread_mutex_lock(&mtx);
while (errIndc == 0) {
pthread_cond_wait(&errCv,&mtx);
if(errIndc == INDIC_ALL_DONE_OK){ // main() says we're done with no issues
pthread_mutex_unlock(&mtx);
err = 0; pthread_exit((void *)(intptr_t) err);
}
}
pthread_mutex_unlock(&mtx);
fprintf(stderr, "watch dog: stopping on error indication %ld\n", errIndc);
cancel_exit(1, errIndc, "");
exit(EXIT_FAILURE); return arg;// not reached
}
void stop_watchDog() {
pthread_mutex_lock(&mtx);
errIndc = INDIC_ALL_DONE_OK;
pthread_cond_signal(&errCv);
pthread_mutex_unlock(&mtx);
pthread_join(thT[0], &retVals[0]);
}
int main() {
uint32 i, k;
int32 rc;
nbFiles = 950;
gThCnt = 1+nbFiles/GROUPSIZE;
if(gThCnt > MAXNBRTHREADS)
fprintf(stderr, "running max %ld threads in parallel\n", MAXNBRTHREADS);
else fprintf(stderr, "using %ld worker thread(s)\n", gThCnt);
gThCnt++; // account for watchDog (thread 0)
thT = (pthread_t *) calloc(gThCnt, sizeof(pthread_t)); if(thT==NULL) { perror("calloc"); exit(EXIT_FAILURE); }
retVals = (void **) calloc( (nbFiles/GROUPSIZE), sizeof(void *)); if(retVals==NULL) { perror("calloc"); exit(EXIT_FAILURE); }
// Start watch dog
rc = pthread_create(&thT[0], NULL, watchDog, NULL);
if(rc != 0) { fprintf(stderr,"pthread_create() failed for thread 0\n"); exit(EXIT_FAILURE); }
thCnt = 1;
i = 0; oldest = 1;
while(thCnt<gThCnt) {
pthread_mutex_lock(&mtx);
if(errIndc != 0){ // watchDog is already tearing down the whole system, no point in creating more threads
pthread_join(thT[0], &retVals[0]); // wait on WatchDog thread, which never returns (it cancel_exists).
exit(EXIT_FAILURE); // not reached
}
pthread_mutex_unlock(&mtx);
gThID = thCnt;
rc = pthread_create(&thT[thCnt], NULL, processFileGroup, (void *)(intptr_t) gThID);
if(rc != 0) {
fprintf(stderr,"pthread_create() failed for thread %ld\n", thCnt);
stop_watchDog();
cancel_exit(1, (int32)thCnt, "Could not create thread");
}
thCnt++;
if(thCnt>MAXNBRTHREADS) { // wait for the oldest thread to finish
pthread_mutex_lock(&mtx);
if(errIndc != 0) { // watchDog is already tearing down the whole system, he'll report the rc of thread "oldest"
printf("[MAXNBRTHREADS] errIndc=%ld, joining watchDog\n", errIndc);
pthread_join(thT[0], &retVals[0]); // wait on WatchDog thread, which never returns (it cancel_exists).
exit(EXIT_FAILURE); // not reached
}
pthread_mutex_unlock(&mtx);
pthread_join(thT[oldest], &retVals[oldest]); rc = (int)(intptr_t) retVals[oldest];
fprintf(stderr, "[MAXNBRTHREADS] Thread %ld done with rc = %ld\n", oldest, rc);
oldest++;
}
}
k = oldest;
while(k<thCnt) {
pthread_mutex_lock(&mtx);
if(errIndc != 0){ // watchDog is already tearing down the whole system, he'll report the rc of thread k
pthread_join(thT[0], &retVals[0]); // wait on WatchDog thread, which never returns (it cancel_exists).
exit(EXIT_FAILURE); // not reached
}
pthread_mutex_unlock(&mtx);
pthread_join(thT[k], &retVals[k]); rc = (int)(intptr_t) retVals[k];
fprintf(stderr, "Thread %ld done with rc = %ld\n", k, rc);
oldest = ++k;
}
// Signal watch dog to quit
stop_watchDog();
exit(EXIT_SUCCESS);
}
Line 82 causes this program to segfault. Why ? Is it illegal to join a canceled thread ?
If you comment line 82, other issues show up. If you run the program 3 of 4 times you witness one of theses pathological outcomes :
How can thread 11 have two different exit codes ?
..
watch dog: stopping on error indication 17
Error in thread 17. Stoping..
th 19 started
cexit: thread 11 finished, rc = 115390242
[MAXNBRTHREADS] Thread 11 done with rc = -1
Sometimes the program will hang in MAXNBRTHREADS section :
...
[MAXNBRTHREADS] errIndc=17, joining watchDog
There is apparently a race condition in this section; but I couldn't figure it out.
Any help appreciated.
Upvotes: 0
Views: 751
Reputation: 180171
You ask:
Line 82 causes this program to segfault. Why ? Is it illegal to join a canceled thread ?
POSIX does not say that in so many words, but it certainly seems to imply so. The specifications for pthread_join()
say:
The behavior is undefined if the value specified by the thread argument to pthread_join() does not refer to a joinable thread.
and later, in the RATIONALE,
If an implementation detects use of a thread ID after the end of its lifetime, it is recommended that the function should fail and report an [ESRCH] error.
The segfault you observe is not consistent with the (non-normative) recommendation in the rationale, but the rationale does support the proposition that a thread is no longer a "joinable thread" after its lifetime has ended (e.g. because it has been canceled), for otherwise the recommendation would be inconsistent with the function's specified behavior. Certainly threads that have already been joined are no longer joinable, though the reason to use "joinable" instead of "live" or similar is probably more the provisions for detaching threads.
How can thread 11 have two different exit codes ?
It can't, and your output does not demonstrate otherwise. You are joining thread 11 twice, so at least one of those pthread_join()
calls must fail. In the event that it does, you cannot rely on any result value that it might have stored (not based on POSIX, anyway). You ought to check the return values of your function calls for error flags.
Sometimes the program will hang in MAXNBRTHREADS section
Yes, it appears that it could do.
The idea here appears to be that in the failure case, the main thread will call stop_watchDog()
, which will set a flag to advise the watchdog thread that it should stop, and then signal the condition variable to make the watchdog wake up and notice it. When it does wake up, the watchdog thread must re-acquire mutex mtx
before it can return from pthread_cond_wait()
.
After returning from stop_watchDog()
, the main thread locks mutex mtx
and
attempts to join the watchdog thread. But signaling a CV is not synchronous. It is therefore possible that the main thread locks the mutex before the watchdog thread reacquires it, in which case you will deadlock: the watchdog cannot return from pthread_cond_wait()
and proceed to terminate until it acquires the mutex, but the main thread will not unlock the mutex until the watchdog terminates.
I haven't analyzed the program enough to be sure exactly what state the main thread needs to protect there, though it appears to include at least the errIndc
variable. Any way around, however, it does not appear to need to hold the mutex locked while trying to join the watchdog thread.
Upvotes: 1