ashok v
ashok v

Reputation: 408

How can I make putall and getall methods of a Queue subclass atomic?

I have a class that inherits from Queue, and is used by another factory class:

class myQueue(Queue):
    def putall(self , mobjects , *args):
        self.put(mobject, *args )

    def getall(self , number , *args ):
        return [self.get(*args) for _ in xrange(number)]

I use putall and getall externally, and want them to by atomic. How can I do that? I'm using Python 2.6.

Upvotes: 0

Views: 62

Answers (1)

dano
dano

Reputation: 94961

In order to make putall and getall atomic, you need to hold a lock that will prevent any other methods that modify the Queue from running. The obvious choice would be to use the mutex object used internally by Queue, but that's implemented using a threading.Lock, which means it can't be taken recursively. That means the naive solution will cause a deadlock:

from Queue import Queue
import threading

class myQueue(Queue):
    def putall(self , objects , *args):
        with self.mutex:
            for object in objects:
                self.put(object, *args) # This will hang.

    def getall(self , number , *args ):
        with self.mutex:
            return [self.get(*args) for _ in xrange(number)] # This will hang

q = myQueue()
q.putall(["1", "2", "3"])
print(q.getall(2))

This will deadlock inside putall and getall, since both will try to recursively acquire mutex (first in putall/getall, then again in put/get). You've got a few options to address this. The simplest is to override the mutex instance created inside Queue.__init__ with a threading.RLock. This also requires recreating a few other threading.Condition objects built using that mutex:

from Queue import Queue
import threading

class myQueue(Queue):
    def __init__(self, *args, **kwargs):
        Queue.__init__(self, *args, **kwargs)
        self.mutex = threading.RLock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)

    def putall(self , objects , *args):
        with self.mutex:
            for object in objects:
                self.put(object, *args)

    def getall(self , number , *args ):
        with self.mutex:
            return [self.get(*args) for _ in xrange(number)]

q = myQueue()
q.putall(["1", "2", "3"])

print(q.getall(2))

Output:

['1', '2']

Note that you're relying on implementation details of Queue here, which makes this susceptible breaking if the implementation changes in a new version of Python. A more future-proof version of myQueue would wrap a Queue instance, provide implementations for all its public methods that require a mutex, and use your own recursive lock to synchronize those methods.

Upvotes: 1

Related Questions