Reputation: 7234
How can I implement conditional lock in threaded application, for instance I haw 30 threads that are calling function and for most off the time all threads can access is simultaneous, but depending on function input there can be condition when only one thread can do that one thing. (If value for input is repeated and some thread is still working then I need lock.)
I now that there is module threading with Rlock() but I don't now how to use it in a way that i described it in first part.
Edit: The question is actually about how to prevent any two threads from running the same function with the same argument at the same time. (Thanks to David for helping me formulate my question :) )
Upvotes: 1
Views: 4140
Reputation: 131550
Try this: have a lock in the module where your function is, and if the input to the function is such that locking is required, acquire the lock inside the function. Otherwise don't.
l = threading.RLock()
def fn(arg):
if arg == arg_that_needs_lock:
l.acquire()
try:
# do stuff
finally:
l.release()
else:
# do other stuff
EDIT:
As far as I can tell now, the question is actually about how to prevent any two threads from running the same function with the same argument at the same time. There's no problem with two threads running the same function with different arguments at the same time, though. The simple method to do this, if all valid arguments to the function can be dictionary keys, is to create a dictionary of arguments to locks:
import threading
dict_lock = threading.RLock()
locks = {}
def fn_dict(arg):
dict_lock.acquire()
try:
if arg not in dict:
locks[arg] = threading.RLock()
l = locks[arg]
finally:
dict_lock.release()
l.acquire()
try:
# do stuff
finally:
l.release()
If your function can be called with many different arguments, though, that amounts to a lot of locks. Probably a better way is to have a set of all arguments with which the function is currently executing, and have the contents of that set protected by a lock. I think this should work:
set_condition = threading.Condition()
current_args = set()
def fn_set(arg):
set_condition.acquire()
try:
while arg in current_args:
set_condition.wait()
current_args.add(arg)
finally:
set_condition.release()
# do stuff
set_condition.acquire()
try:
current_args.remove(arg)
set_condition.notifyAll()
finally:
set_condition.release()
Upvotes: 7
Reputation: 54108
It sounds like you want something similar to a Readers-Writer lock.
This is probably not what you want, but might be a clue:
from __future__ import with_statement
import threading
def RWLock(readers = 1, writers = 1):
m = _Monitor(readers, writers)
return (_RWLock(m.r_increment, m.r_decrement), _RWLock(m.w_increment, m.w_decrement))
class _RWLock(object):
def __init__(self, inc, dec):
self.inc = inc
self.dec = dec
def acquire(self):
self.inc()
def release(self):
self.dec()
def __enter__(self):
self.inc()
def __exit__(self):
self.dec()
class _Monitor(object):
def __init__(self, max_readers, max_writers):
self.max_readers = max_readers
self.max_writers = max_writers
self.readers = 0
self.writers = 0
self.monitor = threading.Condition()
def r_increment(self):
with self.monitor:
while self.writers > 0 and self.readers < self.max_readers:
self.monitor.wait()
self.readers += 1
self.monitor.notify()
def r_decrement(self):
with self.monitor:
while self.writers > 0:
self.monitor.wait()
assert(self.readers > 0)
self.readers -= 1
self.monitor.notify()
def w_increment(self):
with self.monitor:
while self.readers > 0 and self.writers < self.max_writers:
self.monitor.wait()
self.writers += 1
self.monitor.notify()
def w_decrement(self):
with self.monitor:
assert(self.writers > 0)
self.writers -= 1
self.monitor.notify()
if __name__ == '__main__':
rl, wl = RWLock()
wl.acquire()
wl.release()
rl.acquire()
rl.release()
(Unfortunately not tested)
Upvotes: 1