Reputation: 529
I am writing an application where I want to allow to more than one threads to use the same resource at the same time. I don't understand how that could be achieved. At the moment, I have gone up to this point: I have two threads thread a and b and two locks. First the thread a runs and then thread b. I would like to have a third thread as well - which is not a problem - but I want thread a and b to execute concurrently and after they complete, allow to the third thread to assume control of the common resource.
from threading import Lock, Thread
import logging
import time
class SynchronizedThreads:
def __init__(self):
# Initialize a list for threads.
self.threads = []
# Initialize lock objects.
self.lock_a = Lock()
self.lock_b = Lock()
# Set the logging format.
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format, level=logging.INFO, datefmt="%H:%M:%S")
def thread_a(self):
logging.info('Thread A is starting ...')
logging.info('Thread A is waiting to acquire lock A.')
self.lock_a.acquire()
logging.info('Thread A has acquired lock A, performing some calculation...')
time.sleep(2)
logging.info('Thread A is waiting to acquire lock B.')
self.lock_b.acquire()
logging.info('Thread A has acquired lock B, performing some calculation...')
time.sleep(2)
logging.info('Thread A is releasing both locks.')
self.lock_a.release()
self.lock_b.release()
def thread_b(self):
logging.info('Thread B is starting...')
logging.info('Thread B is waiting to acquire lock B.')
self.lock_a.acquire()
logging.info('Thread B has acquired lock B, performing some calculation...')
time.sleep(5)
logging.info('Thread B is waiting to acquire lock A.')
self.lock_b.acquire()
logging.info('Thread B has acquired lock A, performing some calculation...')
time.sleep(5)
logging.info('Thread B is releasing both locks.')
self.lock_b.release()
self.lock_a.release()
def start_threads(self):
for thread_func in [self.thread_a, self.thread_b]:
self.threads.append(Thread(target=thread_func))
self.threads[-1].start()
def join_threads(self):
for thread in self.threads:
thread.join()
def main():
sync_threads = SynchronizedThreads()
sync_threads.start_threads()
sync_threads.join_threads()
logging.info('Finished')
if __name__ == '__main__':
main()
Upvotes: 1
Views: 575
Reputation: 10827
One possible solution is to use a semaphore. It can be thought of as a thread safe counter, so you have two things that need to be done, have the third thread wait on a semaphore twice. If both thread A and B finish at the same time somehow, it'll work, but in this simple example, it will safely wait for them to finish one after the other:
from threading import Lock, Thread, Semaphore
import logging
import time
class SynchronizedThreads:
def __init__(self):
# Initialize a list for threads.
self.threads = []
# Initialize lock objects.
self.lock_a = Lock()
self.lock_b = Lock()
self.sem = Semaphore(0)
# Set the logging format.
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format, level=logging.INFO, datefmt="%H:%M:%S")
def thread_a(self):
logging.info('Thread A is starting ...')
logging.info('Thread A is waiting to acquire lock A.')
self.lock_a.acquire()
logging.info('Thread A has acquired lock A, performing some calculation...')
time.sleep(2)
logging.info('Thread A is waiting to acquire lock B.')
self.lock_b.acquire()
logging.info('Thread A has acquired lock B, performing some calculation...')
time.sleep(2)
logging.info('Thread A is releasing both locks.')
self.lock_a.release()
self.lock_b.release()
logging.info("Thread A is signaling that it's done to the semaphore")
self.sem.release()
def thread_b(self):
logging.info('Thread B is starting...')
logging.info('Thread B is waiting to acquire lock B.')
self.lock_a.acquire()
logging.info('Thread B has acquired lock B, performing some calculation...')
time.sleep(5)
logging.info('Thread B is waiting to acquire lock A.')
self.lock_b.acquire()
logging.info('Thread B has acquired lock A, performing some calculation...')
time.sleep(5)
logging.info('Thread B is releasing both locks.')
self.lock_b.release()
self.lock_a.release()
logging.info("Thread B is signaling that it's done to the semaphore")
self.sem.release()
def thread_c(self):
logging.info('Thread C is starting...')
# Two workers, wait for both of them
expected_workers = 2
for _ in range(expected_workers):
logging.info('Thread C is waiting for the semaphore...')
self.sem.acquire()
logging.info("Thread C got the semaphore")
logging.info("Thread C doing some work...")
time.sleep(5)
logging.info("Thread C all done")
def start_threads(self):
for thread_func in [self.thread_a, self.thread_b, self.thread_c]:
self.threads.append(Thread(target=thread_func))
self.threads[-1].start()
def join_threads(self):
for thread in self.threads:
thread.join()
def main():
sync_threads = SynchronizedThreads()
sync_threads.start_threads()
sync_threads.join_threads()
logging.info('Finished')
if __name__ == '__main__':
main()
Upvotes: 1