speedplane
speedplane

Reputation: 16141

Python: Building a Reentrant Semaphore (combining RLock and Semaphore)

How would you go about combining threading.RLock with threading.Semaphore? Or does such a structure already exist?

In Python, there is a primitive for a Reentrant lock, threading.RLock(N), which allows the same thread to acquire a lock multiple times, but no other threads can. There is also threading.Semaphore(N), which allows the lock to be acquired N times before blocking. How would one combine these two structures? I want up to N separate threads to be able to acquire the lock, but I'd like each individual lock on a thread to be a reentrant one.

Upvotes: 4

Views: 549

Answers (2)

Ilya Egorov
Ilya Egorov

Reputation: 251

Your implementation is already good. The threading.local() approach is simple and efficient, so with additional support for timeouts and non-blocking calls you would have a complete reentrant semaphore. However, I also wrote my own variant of combining these two structures, which I called aiologic.RCapacityLimiter (I'm the creator of aiologic).

import time

from concurrent.futures import ThreadPoolExecutor

from aiologic import RCapacityLimiter

limiter = RCapacityLimiter(2)


def subfunc(i):
    with limiter:
        assert limiter.borrowed_tokens <= 2

        time.sleep(0.5)

        print(f"it works! (thread #{i})")


def func(i):
    with limiter:
        subfunc(i)


with ThreadPoolExecutor(4) as executor:
    for i in range(4):
        executor.submit(func, i)

In contrast to semaphores, reentrant capacity limiters give you more information about what is happening at runtime:

  • RCapacityLimiter.waiting is the number of threads that are waiting to acquire the limiter.
  • RCapacityLimiter.available_tokens is the number of available non-blocking calls.
  • RCapacityLimiter.borrowed_tokens is the number of threads that have acquired the limiter.
  • RCapacityLimiter.total_tokens is the maximum number of threads.
  • RCapacityLimiter.borrowers is a dictionary that contains information about which thread has acquired the limiter how many times.

Along with this, you also get all the other features of the aiologic package, such as support for asynchronous libraries like asyncio.

Upvotes: 0

speedplane
speedplane

Reputation: 16141

So I guess a Reentrant semaphore does not exist. Here is the implementation I came up with, happy to entertain comments.

import threading
import datetime
class ReentrantSemaphore(object):
  '''A counting Semaphore which allows threads to reenter.'''
  def __init__(self, value = 1):
    self.local = threading.local()
    self.sem = threading.Semaphore(value)

  def acquire(self):
    if not getattr(self.local, 'lock_level', 0):
      # We do not yet have the lock, acquire it.
      start = datetime.datetime.utcnow()
      self.sem.acquire()
      end = datetime.datetime.utcnow()
      if end - start > datetime.timedelta(seconds = 3):
        logging.info("Took %d Sec to lock."%((end - start).total_seconds()))
      self.local.lock_time = end
      self.local.lock_level = 1
    else:
      # We already have the lock, just increment it due to the recursive call.
      self.local.lock_level += 1

  def release(self):
    if getattr(self.local, 'lock_level', 0) < 1:
      raise Exception("Trying to release a released lock.")

    self.local.lock_level -= 1
    if self.local.lock_level == 0:
      self.sem.release()

  __enter__ = acquire
  def __exit__(self, t, v, tb):
    self.release()

Upvotes: 2

Related Questions