Reputation: 2124
I'm curious if there is a way to lock a multiprocessing.Queue
object manually.
I have a pretty standard Producer/Consumer pattern set up in which my main thread is constantly producing a series of values, and a pool of multiprocessing.Process
workers is acting on the values produced.
It is all controlled via a sole multiprocessing.Queue()
.
import time
import multiprocessing
class Reader(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
item = self.queue.get()
if isinstance(item, str):
break
if __name__ == '__main__':
queue = multiprocessing.Queue()
reader = Reader(queue)
reader.start()
start_time = time.time()
while time.time() - start_time < 10:
queue.put(1)
queue.put('bla bla bla sentinal')
queue.join()
The issue I'm running into is that my worker pool cannot consume and process the queue
as fast as the main thread insert values into it. So after some period of time, the Queue is so unwieldy that it pops a MemoryError.
An obvious solution would be to simply add a wait check in the producer to stall it from putting any more values into the queue. Something along the lines of:
while time.time() - start_time < 10:
queue.put(1)
while queue.qsize() > some_size:
time.sleep(.1)
queue.put('bla bla bla sentinal')
queue.join()
However, because of the funky nature of the program, I'd like to dump everything in the Queue to a file for later processing. But! Without being able to temporarily lock the queue, the worker can't consume everything in it as the producer is constantly filling it back up with junk -- conceptually anyway. After numerous tests it seems that at some point one of the locks wins (but usually the one adding to the queue).
Edit: Also, I realize it'd be possible to simply stop the producer and consume it from that thread... but that makes the Single Responsibility guy in me feel sad, as the producer is a Producer, not a Consumer.
After looking through the source of Queue
, I came up with this:
def dump_queue(q):
q._rlock.acquire()
try:
res = []
while not q.empty():
res.append(q._recv())
q._sem.release()
return res
finally:
q._rlock.release()
However, I'm too scared to use it! I have no idea if this is "correct" or not. I don't have a firm enough grasp to know if this'll hold up without blowing up any of Queue
s internals.
Anyone know if this'll break? :)
Upvotes: 2
Views: 1168
Reputation: 70735
Given what was said in the comments, a Queue
is simply a wrong data structure for your problem - but is likely part of a usable solution.
It sounds like you have only one Producer. Create a new, Producer-local (not shared across processes) class implementing the semantics you really need. For example,
class FlushingQueue:
def __init__(self, mpqueue, path_to_spill_file, maxsize=1000, dumpsize=1000000):
from collections import deque
self.q = mpqueue # a shared `multiprocessing.Queue`
self.dump_path = path_to_spill_file
self.maxsize = maxsize
self.dumpsize = dumpsize
self.d = deque() # buffer for overflowing values
def put(self, item):
if self.q.qsize() < self.maxsize:
self.q.put(item)
# in case consumers have made real progress
while self.d and self.q.qsize() < self.maxsize:
self.q.put(self.d.popleft())
else:
self.d.append(item)
if len(self.d) >= self.dumpsize:
self.dump()
def dump(self):
# code to flush self.d to the spill file; no
# need to look at self.q at all
I bet you can make this work :-)
Upvotes: 2