Zack Yoshyaro
Zack Yoshyaro

Reputation: 2124

Is it possible to manually lock/unlock a Queue?

I'm curious if there is a way to lock a multiprocessing.Queue object manually.

I have a pretty standard Producer/Consumer pattern set up in which my main thread is constantly producing a series of values, and a pool of multiprocessing.Process workers is acting on the values produced.

It is all controlled via a sole multiprocessing.Queue().

import time
import multiprocessing


class Reader(multiprocessing.Process): 
    def __init__(self, queue): 
        multiprocessing.Process.__init__(self)
        self.queue = queue 

    def run(self): 
        while True: 
            item = self.queue.get() 
            if isinstance(item, str): 
                break 


if __name__ == '__main__': 

    queue = multiprocessing.Queue()
    reader = Reader(queue)
    reader.start()

    
    start_time = time.time()
    while time.time() - start_time < 10: 
        queue.put(1)
    queue.put('bla bla bla sentinal')
    queue.join() 

The issue I'm running into is that my worker pool cannot consume and process the queue as fast as the main thread insert values into it. So after some period of time, the Queue is so unwieldy that it pops a MemoryError.

An obvious solution would be to simply add a wait check in the producer to stall it from putting any more values into the queue. Something along the lines of:

while time.time() - start_time < 10: 
    queue.put(1)
    while queue.qsize() > some_size:
        time.sleep(.1)
queue.put('bla bla bla sentinal')
queue.join() 

However, because of the funky nature of the program, I'd like to dump everything in the Queue to a file for later processing. But! Without being able to temporarily lock the queue, the worker can't consume everything in it as the producer is constantly filling it back up with junk -- conceptually anyway. After numerous tests it seems that at some point one of the locks wins (but usually the one adding to the queue).

Edit: Also, I realize it'd be possible to simply stop the producer and consume it from that thread... but that makes the Single Responsibility guy in me feel sad, as the producer is a Producer, not a Consumer.

Edit:

After looking through the source of Queue, I came up with this:

def dump_queue(q):
    q._rlock.acquire()
    try:
        res = []
        while not q.empty():
            res.append(q._recv())
            q._sem.release()
        return res
    finally:
        q._rlock.release()    

However, I'm too scared to use it! I have no idea if this is "correct" or not. I don't have a firm enough grasp to know if this'll hold up without blowing up any of Queues internals.

Anyone know if this'll break? :)

Upvotes: 2

Views: 1168

Answers (1)

Tim Peters
Tim Peters

Reputation: 70735

Given what was said in the comments, a Queue is simply a wrong data structure for your problem - but is likely part of a usable solution.

It sounds like you have only one Producer. Create a new, Producer-local (not shared across processes) class implementing the semantics you really need. For example,

class FlushingQueue:
    def __init__(self, mpqueue, path_to_spill_file, maxsize=1000, dumpsize=1000000):
        from collections import deque
        self.q = mpqueue  # a shared `multiprocessing.Queue`
        self.dump_path = path_to_spill_file
        self.maxsize = maxsize
        self.dumpsize = dumpsize
        self.d = deque()  # buffer for overflowing values

    def put(self, item):
        if self.q.qsize() < self.maxsize:
            self.q.put(item)
            # in case consumers have made real progress
            while self.d and self.q.qsize() < self.maxsize:
                self.q.put(self.d.popleft())
        else:
            self.d.append(item)
            if len(self.d) >= self.dumpsize:
                self.dump()

    def dump(self):
        # code to flush self.d to the spill file; no
        # need to look at self.q at all

I bet you can make this work :-)

Upvotes: 2

Related Questions