Reputation: 57
I want to store a list in a python queue. Rather than storing the list structure as a single queue item, I figured I would try to add each element of the list to the queue one by one.
from multiprocessing import Process,Queue;
import time;
import sys;
def something1(q):
q.put("hello");
time.sleep(2);
q.put("hello4");
def something2(q):
q.put("hello2");
time.sleep(1);
q.put("hello3");
def run():
queue = Queue();
t1=Process(target=something1,args=(queue,));
t1.start();
t2=Process(target=something2,args=(queue,));
t2.start();
time.sleep(3);
#queue.task_done();
lol = [];
while(queue.qsize() !=0):
lol.append(queue.get(False));
for l in lol:
print("inside lol",l);
queue.put(l);
print("Queue size",queue.qsize());
sys.stdout.flush();
while(queue.qsize() !=0):
print("inside queue",queue.get(False));
run();
I think the test code speaks for itself... I just want to 'get' from a queue (which is really a pop), then at the end of doing what I'm doing I'd like to re-add the whole list to the queue so other processes can use the structure.
One would expect:
('inside lol', 'hello')
('inside lol', 'hello2')
('inside lol', 'hello3')
('inside lol', 'hello4')
('Queue size', 4)
('inside queue', 'hello')
('inside queue', 'hello2')
('inside queue', 'hello3')
('inside queue', 'hello4')
But instead I get:
('inside lol', 'hello')
('inside lol', 'hello2')
('inside lol', 'hello3')
('inside lol', 'hello4')
('Queue size', 4L)
Traceback (most recent call last):
File "mpTest.py", line 34, in <module>
run();
File "mpTest.py", line 33, in run
print("inside queue",queue.get(False));
File "/usr/lib/python2.7/multiprocessing/queues.py", line 134, in get
raise Empty
Queue.Empty
Queue size is 4L? Huh?
Second confusing thing is that my Queue object has no function "task_done", which is absurd because in the docs it is definitely there.
And the main confusing thing is that, why can't I put, get, put like this? This is horrendously bad for a much bigger project that I'm working on i.e. this is just a simple test script to help me understand what I'm doing wrong.
Upvotes: 0
Views: 144
Reputation:
Any time you hit an error like this, you need to add a quick sleep to let the queue refresh in memory. The python multiprocessing docs seem to hint at this:
Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
So, your code runs fine for me with:
from multiprocessing import Process,Queue;
import time;
import sys;
def something1(q):
q.put("hello");
time.sleep(2);
q.put("hello4");
def something2(q):
q.put("hello2");
time.sleep(1);
q.put("hello3");
def run():
queue = Queue();
t1=Process(target=something1,args=(queue,));
t1.start();
t2=Process(target=something2,args=(queue,));
t2.start();
time.sleep(3);
#queue.task_done();
lol = [];
while(queue.qsize() !=0):
lol.append(queue.get(False));
for l in lol:
print("inside lol",l);
queue.put(l);
print("Queue size",queue.qsize());
sys.stdout.flush();
time.sleep(0.1) # added by me
while(queue.qsize() !=0):
print("inside queue",queue.get(False));
run();
Note the key line is
time.sleep(0.1)
before getting from the queue again.
Upvotes: 1