Reputation: 591
I had an exercise about threads, locks, and condition variables in C. I needed to write a program that get data, turn it into a linked list, starting 3 threads each calculating result for each node in the list and main thread printing the results after evreyone finished.
This is the main function:
int thread_finished_count;
// Lock and Conditional variable
pthread_mutex_t list_lock;
pthread_mutex_t thread_lock;
pthread_cond_t thread_cv;
int main(int argc, char const *argv[])
{
node *list;
int pairs_count, status;
thread_finished_count = 0;
/* get the data and start the threads */
node *head = create_numbers(argc, argv, &pairs_count);
list = head; // backup head for results
pthread_t *threads = start_threads(&list);
/* wait for threads and destroy lock */
status = pthread_cond_wait(&thread_cv, &list_lock);
chcek_status(status);
status = pthread_mutex_destroy(&list_lock);
chcek_status(status);
status = pthread_mutex_destroy(&thread_lock);
chcek_status(status);
/* print result in original list */
print_results(head);
/* cleanup */
wait_for_threads(threads, NUM_THREADS);
free_list(head);
free(threads);
return EXIT_SUCCESS;
}
Please note that the create_numbers function is working properly, and the list is working as intended.
Here is the start_thread and thread_function code:
pthread_t *start_threads(node **list)
{
int status;
pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * NUM_THREADS);
check_malloc(threads);
for (int i = 0; i < NUM_THREADS; i++)
{
status = pthread_create(&threads[i], NULL, thread_function, list);
chcek_status(status);
}
return threads;
}
void *thread_function(node **list)
{
int status, self_id = pthread_self();
printf("im in %u\n", self_id);
node *currentNode;
while (1)
{
if (!(*list))
break;
status = pthread_mutex_lock(&list_lock);
chcek_status(status);
printf("list location %p thread %u\n", *list, self_id);
if (!(*list))
{
status = pthread_mutex_unlock(&list_lock);
chcek_status(status);
break;
}
currentNode = (*list);
(*list) = (*list)->next;
status = pthread_mutex_unlock(&list_lock);
chcek_status(status);
currentNode->gcd = gcd(currentNode->num1, currentNode->num2);
status = usleep(10);
chcek_status(status);
}
status = pthread_mutex_lock(&thread_lock);
chcek_status(status);
thread_finished_count++;
status = pthread_mutex_unlock(&thread_lock);
chcek_status(status);
if (thread_finished_count != 3)
return NULL;
status = pthread_cond_signal(&thread_cv);
chcek_status(status);
return NULL;
}
void chcek_status(int status)
{
if (status != 0)
{
fputs("pthread_function() error\n", stderr);
exit(EXIT_FAILURE);
}
}
Note that self_id is used for debugging purposes.
Please note!: I thought about the option that maybe the first thread is created, and untill the second thread is created the first one already finish all the jobs. This is why I added the "I'm in #threadID" check with usleep(10) when evrey thread is created. They all come in but only first one is doing all the jobs. Here is the output example if I do usleep after the mutex unlock (notice diffrent thread ID)
with usleep
./v2 nums.txt
im in 1333593856
list location 0x7fffc4fb56a0 thread 1333593856
im in 1316685568
im in 1325139712
list location 0x7fffc4fb56c0 thread 1333593856
list location 0x7fffc4fb56e0 thread 1316685568
list location 0x7fffc4fb5700 thread 1325139712
list location 0x7fffc4fb5720 thread 1333593856
list location 0x7fffc4fb5740 thread 1316685568
list location 0x7fffc4fb5760 thread 1325139712
list location 0x7fffc4fb5780 thread 1333593856
list location 0x7fffc4fb57a0 thread 1316685568
list location 0x7fffc4fb57c0 thread 1325139712
list location 0x7fffc4fb57e0 thread 1333593856
list location 0x7fffc4fb5800 thread 1316685568
list location (nil) thread 1325139712
list location (nil) thread 1333593856
...
normal result output
...
And thats the output if I comment out the usleep after mutex lock (Notice the same thread ID) without usleep
./v2 nums.txt
im in 2631730944
list location 0x7fffe5b946a0 thread 2631730944
list location 0x7fffe5b946c0 thread 2631730944
list location 0x7fffe5b946e0 thread 2631730944
list location 0x7fffe5b94700 thread 2631730944
list location 0x7fffe5b94720 thread 2631730944
list location 0x7fffe5b94740 thread 2631730944
list location 0x7fffe5b94760 thread 2631730944
list location 0x7fffe5b94780 thread 2631730944
list location 0x7fffe5b947a0 thread 2631730944
list location 0x7fffe5b947c0 thread 2631730944
list location 0x7fffe5b947e0 thread 2631730944
list location 0x7fffe5b94800 thread 2631730944
im in 2623276800
im in 2614822656
...
normal result output
...
My goal is that each thread will take element, do the calculation and in the meanwhile another thread will go in and take another element, and new thread will take each element (or at least close to that)
Thanks for reading and I appriciate your help.
Upvotes: 1
Views: 1919
Reputation: 1772
I have looked deeper into how glibc (v2.30 on Linux & x86_64, at least) implements pthread_mutex_lock()
and _unlock()
.
It turns out that _lock()
works something like this:
if (atomic_cmp_xchg(mutex->lock, 0, 1))
return <OK> ; // mutex->lock was 0, is now 1
while (1)
{
if (atomic_xchg(mutex->lock, 2) == 0)
return <OK> ; // mutex->lock was 0, is now 2
...do FUTEX_WAIT(2)... // suspend thread iff mutex->lock == 2...
} ;
And _unlock()
works something like this:
if (atomic_xchg(mutex->lock, 0) == 2) // set mutex->lock == 0
...do FUTEX_WAKE(1)... // if may have waiter(s) start 1
Now:
mutex->lock
: 0 => unlocked, 1 => locked-but-no-waiters, 2 => locked-with-waiter(s)
'locked-but-no-waiters' optimizes for the case where there is no lock contention and there is no need to do FUTEX_WAKE
in _unlock()
.
the _lock()
/_unlock()
functions are in the library -- they are not in the kernel.
...in particular, the ownership of the mutex is a matter for the library, not the kernel.
FUTEX_WAIT(2)
is a call to the kernel, which will place the thread on a pending queue associated with the mutex, unless mutex->lock != 2
.
The kernel checks for mutex->lock == 2
and adds the thread to the queue atomically. This deals with the case of _unlock()
being called after the atomic_xchg(mutex->lock, 2)
.
FUTEX_WAKE(1)
is also a call to the kernel, and the futex
man page tells us:
FUTEX_WAKE (since Linux 2.6.0)
This operation wakes at most 'val' of the waiters that are waiting ... No guarantee is provided about which waiters are awoken (e.g., a waiter with a higher scheduling priority is not guaranteed to be awoken in preference to a waiter with a lower priority).
where 'val' in this case is 1.
Although the documentation says "no guarantee about which waiters are awoken", the queue appears to be at least FIFO.
Note especially that:
_unlock()
does not pass the mutex to the thread started by the FUTEX_WAKE
.
once woken up, the thread will again try to obtain the lock...
...but may be beaten to it by any other running thread -- including the thread which just did the _unlock()
.
I believe this is why you have not seen the work being shared across the threads. There is so little work for each one to do, that a thread can unlock the mutex, do the work and be back to lock the mutex again before a thread woken up by the unlock can get going and succeed in locking the mutex.
Upvotes: 0
Reputation: 1772
First, you are doing the gcd()
work while holding the lock... so (a) only one thread will do any work at any one time, though (b) that does not entirely explain why only one thread appears to do (nearly) all the work -- as KamilCuk says, it may be that there is so little work to do, that it's (nearly) all done before the second thread wakes up properly. [More exotic, there may be some latency between thread 'a' unlocking the mutex and another thread starting to run, such that thread 'a' can acquire the mutex before another thread gets there.]
POSIX says that when a mutex is unlocked, if there are waiters then "the scheduling policy shall determine which thread shall acquire the mutex". The default "scheduling policy" is (to the best of my knowledge) implementation defined.
You could try a couple of things: (1) use a pthread_barrier_t
to hold all the threads at the start of thread_function()
until they are all running; (2) use sched_yield(void)
after pthread_mutex_unlock()
to prompt the system into running the newly runnable thread.
Second, you should not under any circumstance treat a 'condition variable' as a signal. For main()
to know that all threads have finished you need a count -- which could be a pthread_barrier_t
; or it could be simple integer, protected by a mutex, with a 'condition variable' to hold the main thread on while it waits; or it could be a count (in main()
) and a semaphore (posted once by each thread as it exits).
Third, you show pthread_cond_wait(&cv, &lock);
in main()
. At that point main()
must own lock
... and it matters when that happened. But: as it stands, the first thread to find the list
empty will kick cv
, and main()
will proceed, even though other threads are still running. Though once main()
does re-acquire lock
, any threads which are then still running will either be exiting or will be stuck on the lock
. (It's a mess.)
In general, the template for using a 'condition variable' is:
pthread_mutex_lock(&...lock) ;
while (!(... thing we need ...))
pthread_cond_wait(&...cond_var, &...lock) ;
... do stuff now we have what we need ....
pthread_mutex_unlock(&...lock) ;
NB: a 'condition variable' does not have a value... despite the name, it is not a flag to signal that some condition is true. A 'condition variable' is, essentially, a queue of threads waiting to be re-started. When a 'condition variable' is signaled, at least one waiting thread will be re-started -- but if there are no threads waiting, nothing happens, in particular the (so called) 'condition variable' retains no memory of the signal.
In the new code, following the above template, main()
should:
/* wait for threads .... */
status = pthread_mutex_lock(&thread_lock);
chcek_status(status);
while (thread_finished_count != 3)
{
pthread_cond_wait(&thread_cv, &thread_lock) ;
chcek_status(status);
} ;
status = pthread_mutex_unlock(&thread_lock) ;
chcek_status(status);
So what is going on here ?
main()
is waiting for thread_finished_count == 3
thread_finished_count
is a shared variable "protected" by the thread_lock
mutex.
...so it is incremented in thread_function()
under the mutex.
...and main()
must also read it under the mutex.
if main()
finds thread_finished_count != 3
it must wait.
to do that it does: pthread_cond_wait(&thread_cv, &thread_lock)
, which:
unlocks thread_lock
places the thread on the thread_cv
queue of waiting threads.
and it does those atomically.
when thread_function()
does the pthread_cond_signal(&thread_cv)
it wakes up the waiting thread.
when the main()
thread wakes up, it will first re-acquire the thread_lock
...
...so it can then proceed to re-read thread_finished_count
, to see if it is now 3
.
FWIW: I recommend not destroying the mutexes etc until after all the threads have been joined.
Upvotes: 2