Ib33X
Ib33X

Reputation: 7234

python conditional lock

How can I implement conditional lock in threaded application, for instance I haw 30 threads that are calling function and for most off the time all threads can access is simultaneous, but depending on function input there can be condition when only one thread can do that one thing. (If value for input is repeated and some thread is still working then I need lock.)

I now that there is module threading with Rlock() but I don't now how to use it in a way that i described it in first part.

Edit: The question is actually about how to prevent any two threads from running the same function with the same argument at the same time. (Thanks to David for helping me formulate my question :) )

Upvotes: 1

Views: 4140

Answers (2)

David Z
David Z

Reputation: 131550

Try this: have a lock in the module where your function is, and if the input to the function is such that locking is required, acquire the lock inside the function. Otherwise don't.

l = threading.RLock()

def fn(arg):
    if arg == arg_that_needs_lock:
        l.acquire()
        try:
            # do stuff
        finally:
            l.release()
    else:
        # do other stuff

EDIT:

As far as I can tell now, the question is actually about how to prevent any two threads from running the same function with the same argument at the same time. There's no problem with two threads running the same function with different arguments at the same time, though. The simple method to do this, if all valid arguments to the function can be dictionary keys, is to create a dictionary of arguments to locks:

import threading

dict_lock = threading.RLock()
locks = {}

def fn_dict(arg):
    dict_lock.acquire()
    try:
        if arg not in dict:
            locks[arg] = threading.RLock()
        l = locks[arg]
    finally:
        dict_lock.release()
    l.acquire()
    try:
        # do stuff
    finally:
        l.release()

If your function can be called with many different arguments, though, that amounts to a lot of locks. Probably a better way is to have a set of all arguments with which the function is currently executing, and have the contents of that set protected by a lock. I think this should work:

set_condition = threading.Condition()
current_args = set()

def fn_set(arg):
    set_condition.acquire()
    try:
        while arg in current_args:
            set_condition.wait()
        current_args.add(arg)
    finally:
        set_condition.release()
    # do stuff
    set_condition.acquire()
    try:
        current_args.remove(arg)
        set_condition.notifyAll()
    finally:
        set_condition.release()

Upvotes: 7

Henrik Gustafsson
Henrik Gustafsson

Reputation: 54108

It sounds like you want something similar to a Readers-Writer lock.

This is probably not what you want, but might be a clue:

from __future__ import with_statement
import threading

def RWLock(readers = 1, writers = 1):
    m = _Monitor(readers, writers)
    return (_RWLock(m.r_increment, m.r_decrement), _RWLock(m.w_increment, m.w_decrement))

class _RWLock(object):
    def __init__(self, inc, dec):
        self.inc = inc
        self.dec = dec

    def acquire(self):
        self.inc()
    def release(self):
        self.dec()
    def __enter__(self):
        self.inc()
    def __exit__(self):
        self.dec()

class _Monitor(object):
    def __init__(self, max_readers, max_writers):
        self.max_readers = max_readers
        self.max_writers = max_writers
        self.readers = 0
        self.writers = 0
        self.monitor = threading.Condition()

    def r_increment(self):
        with self.monitor:
            while self.writers > 0 and self.readers < self.max_readers:
                self.monitor.wait()
            self.readers += 1
            self.monitor.notify()

    def r_decrement(self):
        with self.monitor:
            while self.writers > 0:
                self.monitor.wait()
            assert(self.readers > 0)
            self.readers -= 1
            self.monitor.notify()

    def w_increment(self):
        with self.monitor:
            while self.readers > 0 and self.writers < self.max_writers:
                self.monitor.wait()
            self.writers += 1
            self.monitor.notify()

    def w_decrement(self):
        with self.monitor:
            assert(self.writers > 0)
            self.writers -= 1
            self.monitor.notify()

if __name__ == '__main__':

    rl, wl = RWLock()
    wl.acquire()
    wl.release()
    rl.acquire()
    rl.release()

(Unfortunately not tested)

Upvotes: 1

Related Questions