Reputation: 23980
I need to know when a Queue is closed and wont have more items so I can end the iteration.
I did it by putting a sentinel in the queue:
from Queue import Queue
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return self
def close(self):
self.put(self._sentinel)
def next(self):
item = self.get()
if item is self._sentinel:
raise StopIteration
else:
return item
Given that this is a very common use for a queue, isn't there any builtin implementation?
Upvotes: 12
Views: 10812
Reputation: 373
I have implemented and asyncio.Queue
compatible queue that can be iterated using a async for
loop. See queutils.IterableQueue
.
asyncio.Queue
interfaceAsyncIterable
support: async for item in queue:
QueueDone
exception when the queue has been emptiedadd_producer()
and they must notify the queue with finish()
once they have finished adding itemscount
propertypip install queutils
A Producer is "process" that adds items to the queue. A producer needs to be registered to the queue with add_producer()
coroutine. Once a producer has added all the items it intends to, it notifies the queue with finish()
from queutils import IterableQueue
async def producer(
Q: IterableQueue[int], N: int
) -> None:
# Add a producer to add items to the queue
await Q.add_producer()
for i in range(N):
await Q.put(i)
# notify the queue that this producer does not add more
await Q.finish()
return None
Consumer is a "process" that takes items from a queue with get()
coroutine. Since IterableQueue
is AsyncIterable
, it can be iterated over async for
.
from queutils.iterablequeue import IterableQueue
async def consumer(Q: IterableQueue[int]):
"""
Consume the queue
"""
async for i in Q:
print(f"consumer: got {i} from the queue")
print(f"consumer: queue is done")
Upvotes: 0
Reputation: 751
An old question, and variations of self._sentinel = Object()
will work. Revisiting this in 2021, I would instead suggest using concurrent.futures combined with using None
as your sentinel:
# Note: this is Python 3.8+ code
import queue
import time
import functools
import random
from concurrent.futures import ThreadPoolExecutor
def worker(tup):
(q,i) = tup
print(f"Starting thread {i}")
partial_sum = 0
numbers_added = 0
while True:
try:
item = q.get()
if item is None:
# 'propagate' this 'sentinel' to anybody else
q.put(None)
break
numbers_added += 1
partial_sum += item
# need to pretend that we're doing something asynchronous
time.sleep(random.random()/100)
except Exception as e:
print(f"(warning) Thread {i} got an exception {e}, that shouldn't happen.")
break
print(f"Thread {i} is done, saw a total of {numbers_added} numbers to add up")
return partial_sum
MAX_RANGE = 1024
MAX_THREADS = 12
with ThreadPoolExecutor() as executor:
# create a queue with numbers to add up
(q := queue.Queue()).queue = queue.deque(range(MAX_RANGE))
# kick off the threads
future_partials = executor.map(worker, [(q,i) for i in range(MAX_THREADS)])
# they'll be done more or less instantly, but we'll make them wait
print("Threads launched with first batch ... sleeping 2 seconds")
time.sleep(2)
# threads are still available for more work!
for i in range(MAX_RANGE):
q.put(i)
print("Finished giving them another batch, this time we're not sleeping")
# now we tell them all to wrap it up
q.put(None)
# this will nicely catch the outputs
sum = functools.reduce(lambda x, y: x+y, future_partials)
print(f"Got total sum {sum} (correct answer is {(MAX_RANGE-1)*MAX_RANGE}")
# Starting thread 0
# Starting thread 1
# Starting thread 2
# Starting thread 3
# Starting thread 4
# Starting thread 5
# Starting thread 6
# Starting thread 7
# Starting thread 8
# Starting thread 9
# Starting thread 10
# Starting thread 11
# Threads launched with first batch ... sleeping 2 seconds
# Finished giving them another batch, this time we're not sleeping
# Thread 0 is done, saw a total of 175 numbers to add up
# Thread 3 is done, saw a total of 178 numbers to add up
# Thread 11 is done, saw a total of 173 numbers to add up
# Thread 4 is done, saw a total of 177 numbers to add up
# Thread 9 is done, saw a total of 169 numbers to add up
# Thread 1 is done, saw a total of 172 numbers to add up
# Thread 7 is done, saw a total of 162 numbers to add up
# Thread 10 is done, saw a total of 161 numbers to add up
# Thread 5 is done, saw a total of 169 numbers to add up
# Thread 2 is done, saw a total of 157 numbers to add up
# Thread 6 is done, saw a total of 169 numbers to add up
# Thread 8 is done, saw a total of 186 numbers to add up
# Got total sum 1047552 (correct answer is 1047552
Note how the de facto 'master thread' just need to push None
into the queue, similar to a conditional variable 'signal', which the threads all pick up (and propagate).
Also, this does not use the multiprocessor Queue
which is heavier-weight than the standard (thread-safe) queue. The above code also has the benefit of easily being modified to using ProcessPoolExecutor
or hybrids of both (in either case yes you would need to use multiprocessing.Queue
).
(Side note: generally speaking, if classes are needed to solve a "fundamental" issue in any given generation of Python, there are often new options in more modern versions.)
(Second side note: The only reason the code is Python 3.8+ is because I'm a fan of assignment expressions, which, in line with the above side note, resolves the historical issue of how to initialize a queue from a list without having to resort to non-functional solutions.)
Upvotes: 0
Reputation: 226296
A sentinel is a reasonable way for a producer to send a message that no more queue tasks are forthcoming.
FWIW, your code can be simplified quite a bit with the two argument form of iter():
from Queue import Queue
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return iter(self.get, self._sentinel)
def close(self):
self.put(self._sentinel)
Upvotes: 15
Reputation: 92569
The multiprocessing module has its own version of Queue that does include a close
method. I am not sure how it works in threading, but its worth a try. I don't see why it shouldn't work the same:
from multiprocessing import Queue
q = Queue()
q.put(1)
q.get_nowait()
# 1
q.close()
q.get_nowait()
# ...
# IOError: handle out of range in select()
You could just catch the IOError as the close signal.
TEST
from multiprocessing import Queue
from threading import Thread
def worker(q):
while True:
try:
item = q.get(timeout=.5)
except IOError:
print "Queue closed. Exiting thread."
return
except:
continue
print "Got item:", item
q = Queue()
for i in xrange(3):
q.put(i)
t = Thread(target=worker, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.close()
# Queue closed. Exiting thread.
Though to be honest, its not too much different than setting a flag on the Queue.Queue. The multiprocessing.Queue is just using a closed file descriptor as a flag:
from Queue import Queue
def worker2(q):
while True:
if q.closed:
print "Queue closed. Exiting thread."
return
try:
item = q.get(timeout=.5)
except:
continue
print "Got item:", item
q = Queue()
q.closed = False
for i in xrange(3):
q.put(i)
t = Thread(target=worker2, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.closed = True
# Queue closed. Exiting thread.
Upvotes: 4