Reputation: 16141
How would you go about combining threading.RLock
with threading.Semaphore
? Or does such a structure already exist?
In Python, there is a primitive for a Reentrant lock, threading.RLock(N)
, which allows the same thread to acquire a lock multiple times, but no other threads can. There is also threading.Semaphore(N)
, which allows the lock to be acquired N
times before blocking. How would one combine these two structures? I want up to N
separate threads to be able to acquire the lock, but I'd like each individual lock on a thread to be a reentrant one.
Upvotes: 4
Views: 549
Reputation: 251
Your implementation is already good. The threading.local()
approach is simple and efficient, so with additional support for timeouts and non-blocking calls you would have a complete reentrant semaphore. However, I also wrote my own variant of combining these two structures, which I called aiologic.RCapacityLimiter
(I'm the creator of aiologic).
import time
from concurrent.futures import ThreadPoolExecutor
from aiologic import RCapacityLimiter
limiter = RCapacityLimiter(2)
def subfunc(i):
with limiter:
assert limiter.borrowed_tokens <= 2
time.sleep(0.5)
print(f"it works! (thread #{i})")
def func(i):
with limiter:
subfunc(i)
with ThreadPoolExecutor(4) as executor:
for i in range(4):
executor.submit(func, i)
In contrast to semaphores, reentrant capacity limiters give you more information about what is happening at runtime:
RCapacityLimiter.waiting
is the number of threads that are waiting to acquire the limiter.RCapacityLimiter.available_tokens
is the number of available non-blocking calls.RCapacityLimiter.borrowed_tokens
is the number of threads that have acquired the limiter.RCapacityLimiter.total_tokens
is the maximum number of threads.RCapacityLimiter.borrowers
is a dictionary that contains information about which thread has acquired the limiter how many times.Along with this, you also get all the other features of the aiologic
package, such as support for asynchronous libraries like asyncio
.
Upvotes: 0
Reputation: 16141
So I guess a Reentrant semaphore does not exist. Here is the implementation I came up with, happy to entertain comments.
import threading
import datetime
class ReentrantSemaphore(object):
'''A counting Semaphore which allows threads to reenter.'''
def __init__(self, value = 1):
self.local = threading.local()
self.sem = threading.Semaphore(value)
def acquire(self):
if not getattr(self.local, 'lock_level', 0):
# We do not yet have the lock, acquire it.
start = datetime.datetime.utcnow()
self.sem.acquire()
end = datetime.datetime.utcnow()
if end - start > datetime.timedelta(seconds = 3):
logging.info("Took %d Sec to lock."%((end - start).total_seconds()))
self.local.lock_time = end
self.local.lock_level = 1
else:
# We already have the lock, just increment it due to the recursive call.
self.local.lock_level += 1
def release(self):
if getattr(self.local, 'lock_level', 0) < 1:
raise Exception("Trying to release a released lock.")
self.local.lock_level -= 1
if self.local.lock_level == 0:
self.sem.release()
__enter__ = acquire
def __exit__(self, t, v, tb):
self.release()
Upvotes: 2