bunbun
bunbun

Reputation: 2676

Iterate through multiprocessing queue

I have two multiprocessing threads, one adds items to a queue, the other needs to iterate through the current queue. How do I do that iteration? Or alternatively, how do I convert the current queue to a list to iterate?

Some pseudocode:

import multiprocessing as mp
thequeue = mp.Queue()
def func1():
    global thequeue
    while True:
        item = readstream()
        if item not None:
            thequeue.put(item)
def func2():
    while True:
        for item in thequeue:    # This only works for Lists, how to do this for queues?
            if item == "hi":
                print(item)
main():
    mp.Process(target=func1).start()
    mp.Process(target=func2).start()

Upvotes: 6

Views: 10679

Answers (2)

user2357112
user2357112

Reputation: 280251

multiprocessing.Queue doesn't support iteration directly, because for looping over a container is expected to not modify the container. Such nondestructive iteration is both impossible to support in the multiprocessing.Queue implementation, and a fundamentally inappropriate operation for the use cases multiprocessing.Queue was designed for.

Consumers should use get, which retrieves and removes items from the queue:

def func2():
    while True:
        item = thequeue.get()
        if item == 'hi':
            print(item)

If you prefer the code structure of a for loop, you can use two-argument iter as shown in Sraw's answer, but you'll still remove items from the queue that way. It is not possible to iterate over a multiprocessing.Queue without removing items.

Upvotes: 4

Sraw
Sraw

Reputation: 20206

If you want to write your code in terms of a for loop, you can use the two-argument form of iter:

def func2():
    for item in iter(thequeue.get, None):
        # do what you want

And to stop this process, you just need to put a None into thequeue, or you can make your own signal to stop if None is common in your case.

Note that unlike a normal for loop, this will remove items from the queue, just like calling get manually would. There is no way to iterate through an inter-process queue without removing items.

Upvotes: 14

Related Questions