Kevin Mauro
Kevin Mauro

Reputation: 57

Unexpected behavior adding, removing, re-adding to a python Queue

I want to store a list in a python queue. Rather than storing the list structure as a single queue item, I figured I would try to add each element of the list to the queue one by one.

from multiprocessing import Process,Queue;
import time;
import sys;
def something1(q):
    q.put("hello");
    time.sleep(2);
    q.put("hello4");

def something2(q):
    q.put("hello2");
    time.sleep(1);
    q.put("hello3");


def run():
    queue = Queue();
    t1=Process(target=something1,args=(queue,));
    t1.start();
    t2=Process(target=something2,args=(queue,));
    t2.start();
    time.sleep(3);
    #queue.task_done();
    lol = [];
    while(queue.qsize() !=0):
        lol.append(queue.get(False));

    for l in lol:
        print("inside lol",l);
        queue.put(l);
    print("Queue size",queue.qsize());
    sys.stdout.flush();
    while(queue.qsize() !=0):
        print("inside queue",queue.get(False));
run();

I think the test code speaks for itself... I just want to 'get' from a queue (which is really a pop), then at the end of doing what I'm doing I'd like to re-add the whole list to the queue so other processes can use the structure.

One would expect:

('inside lol', 'hello')
('inside lol', 'hello2')
('inside lol', 'hello3')
('inside lol', 'hello4')
('Queue size', 4)
('inside queue', 'hello')
('inside queue', 'hello2')
('inside queue', 'hello3')
('inside queue', 'hello4')

But instead I get:

('inside lol', 'hello')
('inside lol', 'hello2')
('inside lol', 'hello3')
('inside lol', 'hello4')
('Queue size', 4L)
Traceback (most recent call last):
  File "mpTest.py", line 34, in <module>
    run();
  File "mpTest.py", line 33, in run
    print("inside queue",queue.get(False));
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 134, in get
    raise Empty
Queue.Empty

Queue size is 4L? Huh?

Second confusing thing is that my Queue object has no function "task_done", which is absurd because in the docs it is definitely there.

And the main confusing thing is that, why can't I put, get, put like this? This is horrendously bad for a much bigger project that I'm working on i.e. this is just a simple test script to help me understand what I'm doing wrong.

Upvotes: 0

Views: 144

Answers (1)

user3788941
user3788941

Reputation:

Any time you hit an error like this, you need to add a quick sleep to let the queue refresh in memory. The python multiprocessing docs seem to hint at this:

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

So, your code runs fine for me with:

from multiprocessing import Process,Queue;
import time;
import sys;
def something1(q):
    q.put("hello");
    time.sleep(2);
    q.put("hello4");

def something2(q):
    q.put("hello2");
    time.sleep(1);
    q.put("hello3");


def run():
    queue = Queue();
    t1=Process(target=something1,args=(queue,));
    t1.start();
    t2=Process(target=something2,args=(queue,));
    t2.start();
    time.sleep(3);
    #queue.task_done();
    lol = [];
    while(queue.qsize() !=0):
        lol.append(queue.get(False));

    for l in lol:
        print("inside lol",l);
        queue.put(l);
    print("Queue size",queue.qsize());
    sys.stdout.flush();
    time.sleep(0.1) # added by me
    while(queue.qsize() !=0):
        print("inside queue",queue.get(False));
run();

Note the key line is

time.sleep(0.1)

before getting from the queue again.

Upvotes: 1

Related Questions