nikfio
nikfio

Reputation: 21

counter value changes unexpectedly in a multi-threaded code using pthreads

I'm realizing a complete dynamic Receivers-Reader program: I have a number of Receivers Threads (decided by user), each one connects to a specific client, then a single Reading Thread reads from Receivers' output queue in temporal order and prints data in a .log file.

So I connect a client, the first Receiver Thread is created. I'm not able to understand why suddenly in the Reading Thread after reading an element, the counter of the second Receiver (not created yet) increase by one without any instructions of that kind. Here are the parts of code involved:

Receiver Thread:

printf("Receiver ID: %d, counter: %d\n", rec_arg->receiver_ID, *(rec_arg->counter));
rec_arg->joint_queue[*(rec_arg->tail)] = rec_meas;

pthread_mutex_lock(rec_arg->mtx);
*(rec_arg->tail) = (*(rec_arg->tail) + 1) % QUEUE_SIZE;
(*(rec_arg->counter))++;
pthread_mutex_unlock(rec_arg->mtx);
printf("Receiver ID: %d, counter: %d\n", rec_arg->receiver_ID, *(rec_arg->counter));
printf("Receiver: tail: %d, joint_queue: %p\n", *(rec_arg->tail), (void*) rec_arg->joint_queue);
pthread_cond_signal(rec_arg->notEmpty); 

Reading Thread:

  pthread_mutex_lock(read_arg->queue_mtx[i]);
  (*(read_arg->counters[i]))--;
  printf("Reading Thread: counter[%d]: %d\n", i, *(read_arg->counters[i]));

  if( writetofile(filename, &read_meas) == 0)
       fprintf(stderr,"Reading Thread: writetofile() failed\n");

  *(read_arg->OkRead[i]) = 0;
  *(read_arg->OkRead[(i+1) % read_arg->n_joints]) = 1;
  pthread_mutex_unlock(read_arg->queue_mtx[i]);
  printf("Reading Thread: OkRead[%d]: %d\n", (i+1) % read_arg->n_joints, *(read_arg->OkRead[(i+1) % read_arg->n_joints]));

  printf("Reading Thread: counter[%d]: %d\n", (i+1) % read_arg->n_joints, *(read_arg->counters[(i+1) % read_arg->n_joints])); 

And the output shows:

Reading Thread: counter[0]: 0  
Reading Thread: OkRead[1]: 1  
Reading Thread: counter[1]: 1  
Reading Thread: counter[1]: 1  

More precisely, two thread can be created, when the first client connects, it sends data. Receiver Thread puts data in its queue, Reading Thread saves data in a .log file, decrements counter[0], gives right to be read to the second Receiver (OkRead[1] = 1) and then, without any instructions about, counter[1] shows an increment: counter[1] = 1.

Hope the problem could be understood, thank for your attention.

Upvotes: 0

Views: 57

Answers (2)

nikfio
nikfio

Reputation: 21

I figured it out. Every parameter, that more than one threads needs access to, has to be declared as pointer of pointer in main() and the initializing function needs his address.

Previously, my code was like:

main():

  int *counter;
  info_init(&counter, n_elem);

  read_arg->counters = &counter;

void info_init(int **info, int n_elem):

  *info = malloc(n_joints*sizeof(int*));
  if(*info == NULL) 
  printf("queues_init: malloc failed, errno: %d, meaning: %s\n", errno, strerror(errno));

 int i;
 for(i=0; i<n_joints; i++) {
 info[i] = (int*) malloc(sizeof(int));
  *(info[i]) = 0;
 }

 return;
 }

I solved the problem with:

main():

 int **counter;
 info_init(&counter, n_elem);

 read_arg->counters = counter; 

void info_init(int ***info, int n_joints):

 *info = malloc(n_joints*sizeof(int*));
 if(*info == NULL) 
 printf("queues_init: malloc failed, errno: %d, meaning: %s\n", errno, strerror(errno));

 int i;
 for(i=0; i<n_joints; i++) {
 (*info)[i] = (int*) malloc(sizeof(int));
 *(*info)[i] = 0;
 }

 return;
 }

Now program works correctly, thank @l3x for your help.

Upvotes: 1

P.P
P.P

Reputation: 121397

In the Reading thread, you are the queue counters outside the mutex lock. That means it's possible for another thread to modify OkRead[] and counters[] in the meantime. This is a data race which is undefined behaviour.

Move the call to pthread_mutex_unlock() after the reading the values:

  printf("Reading Thread: OkRead[%d]: %d\n", (i+1) % read_arg->n_joints, *(read_arg->OkRead[(i+1) % read_arg->n_joints]));

  printf("Reading Thread: counter[%d]: %d\n", (i+1) % read_arg->n_joints, *(read_arg->counters[(i+1) % read_arg->n_joints]));

  pthread_mutex_unlock(read_arg->queue_mtx[i]);

Or use a thread local variable that stores the values before releasing the mutex. Such as,

  ...
  int joints = (i+1) % read_arg->n_joints;
  int okread = *(read_arg->OkRead[(i+1) % read_arg->n_joints]);
  int counters = *(read_arg->counters[(i+1) % read_arg->n_joints]);

  pthread_mutex_unlock(read_arg->queue_mtx[i]);

  printf("Reading Thread: OkRead[%d]: %d\n", joints, okread);   
  printf("Reading Thread: counter[%d]: %d\n", (i+1) % read_arg->n_joints, counters ); 

Upvotes: 1

Related Questions