Reputation: 858
Does anyone know a pythonic way of iterating over the elements of a Queue.Queue
without removing them from the Queue. I have a producer/consumer-type program where items to be processed are passed by using a Queue.Queue
, and I want to be able to print what the remaining items are. Any ideas?
Upvotes: 46
Views: 67784
Reputation: 373
I have implemented an IterableQueue(asyncio.Queue)
that supports async for
iteration. See pyutils in GitHub.
from pyutils import IterableQueue
from asyncio import run, Task, create_task
async def producer(Q: IterableQueue[int], n: int) -> None:
await Q.add_producer(N=1)
for i in range(n):
await Q.put(i)
await Q.finish()
return None
async def amain():
q : IterableQueue[int] = IterableQueue(maxsize=5)
task : Task = create_task(producer(q, 10))
# Iterate over queue items
async for i in q:
print(f"Got {i}")
if __name__ == "__main__":
run(amain())
IterableQueue()
counts producers with add_producer()
. Once the last producers finishes (finish()
) then a sentinel value (None
) is added to the queue marking the queue end.
Upvotes: 0
Reputation: 1
You can convert the deque into a list before printing the elements so that you can easily iterate through it.
from collections import deque
d = deque([7,9,3,5])
d.append(2)
d.appendleft(1)
d.append(10)
d.pop()
for elem in list(d):
print(elem, end=" ")
#Output: 1 7 9 3 5 2
Upvotes: 1
Reputation: 15976
Listing queue elements without consuming them:
>>> from Queue import Queue
>>> q = Queue()
>>> q.put(1)
>>> q.put(2)
>>> q.put(3)
>>> print list(q.queue)
[1, 2, 3]
After operation, you can still process them:
>>> q.get()
1
>>> print list(q.queue)
[2, 3]
Upvotes: 24
Reputation: 18670
You can subclass queue.Queue
to achieve this in a thread-safe way:
import queue
class ImprovedQueue(queue.Queue):
def to_list(self):
"""
Returns a copy of all items in the queue without removing them.
"""
with self.mutex:
return list(self.queue)
Upvotes: 10
Reputation: 226296
You can loop over a copy of the underlying data store:
for elem in list(q.queue)
Eventhough this bypasses the locks for Queue objects, the list copy is an atomic operation and it should work out fine.
If you want to keep the locks, why not pull all the tasks out of the queue, make your list copy, and then put them back.
mycopy = []
while True:
try:
elem = q.get(block=False)
except Empty:
break
else:
mycopy.append(elem)
for elem in mycopy:
q.put(elem)
for elem in mycopy:
# do something with the elements
Upvotes: 52