stenci
stenci

Reputation: 8481

Is there a parametric locking mechanism in Python?

By parametric I mean a lock that can be used with a parameter and lock only the threads that use the same parameter. The parameter can have thousands of different values, and it's impossible to create a dictionary of threading.Lock objects, one per parameter value.

A function on my web server accepts two parameters, a group and something else. Every time the function is called it checks if a group file has changed (very fast), and if it has changed it does something with it (very slow). The slow process for each group is completely independent from the other groups and can happen at the same time, but two elements of each group cannot be processed at the same time, nor while the group is being processed.

I was able to get it to work using a global list of groups being processed, but now that I'm done with it I think it's really ugly and there must be a better way.

The snippet below shows what I'm looking for. It uses an imaginary LockWithGroup. Is there something similar in Python?

process_lock = threading.LockWithGroup()
def process_element(group, element):
    print('Start', group)

    with process_lock(group):
        if needs_update(group):
            print('Updating', group)
            update_group(group)
            print('Updated', group)

    with process_lock(group):
        retval = do_something_with(group, element)

    print('End', group)

    return retval

process_element('g1', e1)   # a
process_element('g1', e2)   #  b
process_element('g1', e3)   #   c
process_element('g2', e4)   #    d
process_element('g2', e5)   #     e

Output:

> Start g1                  # a
> Start g2                  #    d
> Updating g1               # a
> Updating g2               #    d
> Updated g1                # a
> End g1                    # a
> Start g1                  #  b
> End g1                    #  b
> Start g1                  #   c
> Updated g2                #    d
> End g2                    #    d
> Start g2                  #     e    
> End g1                    #   c
> End g2                    #     e

Upvotes: 3

Views: 1042

Answers (1)

stenci
stenci

Reputation: 8481

Inspired by the answer to the post mentioned in the comment I created a class that seems to do the job.

I used the code from that answer, added timeout and blocking arguments and put it inside a class so I can use it as a context manager. The class uses static methods, so it can be instantiated once or it can be created many times (as the test below does in slow_worker_2).

The first part of the code is the class, the second part tests both explicit acquire and release and the context manager with with.

import threading
import time

namespace_lock = threading.Lock()
namespace = {}
counters = {}


class NamespaceLock:
    def __init__(self, group):
        self.group = group

    def __enter__(self):
        self.__class__.acquire_lock(self.group)

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.__class__.release_lock(self.group)

    @staticmethod
    def acquire_lock(value, blocking=True, timeout=-1.0):
        with namespace_lock:
            if value in namespace:
                counters[value] += 1
            else:
                namespace[value] = threading.Lock()
                counters[value] = 1

        return namespace[value].acquire(blocking=blocking, timeout=timeout)

    @staticmethod
    def release_lock(value):
        with namespace_lock:
            if counters[value] == 1:
                del counters[value]
                lock = namespace.pop(value)
            else:
                counters[value] -= 1
                lock = namespace[value]

        lock.release()


def slow_worker_1(group, seconds):
    if NamespaceLock.acquire_lock(group, timeout=2.5):
        print('Start   {} {}'.format(group, seconds))
        time.sleep(seconds)
        print('End     {} {}'.format(group, seconds))
        NamespaceLock.release_lock(group)
    else:
        print('Timeout {} {}'.format(group, seconds))


def slow_worker_2(group, seconds):
    with NamespaceLock(group):
        print('Start {} {}'.format(group, seconds))
        time.sleep(seconds)
        print('End   {} {}'.format(group, seconds))


def join_all(name):
    for t in threading.enumerate():
        if t.name == name:
            t.join()


if __name__ == '__main__':
    print('explicit acquire and release')

    threading.Thread(target=slow_worker_1, args=('g1', 1), name='worker').start()
    threading.Thread(target=slow_worker_1, args=('g2', 2), name='worker').start()
    threading.Thread(target=slow_worker_1, args=('g1', 3), name='worker').start()
    threading.Thread(target=slow_worker_1, args=('g2', 4), name='worker').start()
    threading.Thread(target=slow_worker_1, args=('g1', 5), name='worker').start()

    join_all('worker')

    print('context manager')

    threading.Thread(target=slow_worker_2, args=('g1', 1), name='worker').start()
    threading.Thread(target=slow_worker_2, args=('g2', 2), name='worker').start()
    threading.Thread(target=slow_worker_2, args=('g1', 3), name='worker').start()
    threading.Thread(target=slow_worker_2, args=('g2', 4), name='worker').start()
    threading.Thread(target=slow_worker_2, args=('g1', 5), name='worker').start()

    join_all('worker')

Upvotes: 1

Related Questions