Reputation: 1131
I'd like to do something like that (1 queue, and multiple consumers):
import gevent
from gevent import queue
q=queue.Queue()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)
def consumer(qq):
for i in qq:
print i
jobs=[gevent.spawn(consumer,i) for i in [q,q]]
gevent.joinall(jobs)
But it's not possible ... the queue is consumed by job1 ... so job2 would block forever.
It gives me the exception gevent.hub.LoopExit: This operation would block forever
.
I would that each consumer will be able to consume the full queue from start. (should display 1,2,3,1,2,3 or 1,1,2,2,3,3 ... nevermind)
One idea should be to clone the queue before spawning, but it's not possible using copy (shallow/deep) module ;-(
Is there another way to do that ?
[EDIT] what do you think of that ?
import gevent
from gevent import queue
class MasterQueueClonable(queue.Queue):
def __init__(self,*a,**k):
queue.Queue.__init__(self,*a,**k)
self.__cloned = []
self.__old=[]
#override
def get(self,*a,**k):
e=queue.Queue.get(self,*a,**k)
for i in self.__cloned: i.put(e) # serve to current clones
self.__old.append(e) # save old element
return e
def clone(self):
q=queue.Queue()
for i in self.__old: q.put(i) # feed a queue with elements which are out
self.__cloned.append(q) # stock the queue, to be able to put newer elements too
return q
q=MasterQueueClonable()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)
def consumer(qq):
for i in qq:
print id(qq),i
jobs=[gevent.spawn(consumer,i) for i in [q.clone(), q ,q.clone(),q.clone()]]
gevent.joinall(jobs)
It's based on the idea of RyanYe. There is a "master queue" without a dispatcher. My master queue override the GET method, and can dispatch to an ondemand clone. And more, a "clone" can be created after the start of the masterqueue (with the __old trick).
Upvotes: 2
Views: 1375
Reputation: 3269
I suggest you to create a greenlet to dispatch the work to consumers. Example code:
import gevent
from gevent import queue
master_queue=queue.Queue()
master_queue.put(1)
master_queue.put(2)
master_queue.put(3)
master_queue.put(StopIteration)
total_consumers = 10
consumer_queues = [queue.Queue() for i in xrange(total_consumers)]
def dispatcher(master_queue, consumer_queues):
for i in master_queue:
[j.put(i) for j in consumer_queues]
[j.put(StopIteration) for j in consumer_queues]
def consumer(qq):
for i in qq:
print i
jobs=[gevent.spawn(dispatcher, q, consumer_queues)] + [gevent.spawn(consumer, i) for i in consumer_queues]
gevent.joinall(jobs)
UPDATE: Fix missing StopIteration for consumer queues. Thanks arilou for pointing it out.
Upvotes: 2
Reputation: 3780
I've added copy()
method to Queue class:
>>> import gevent.queue
>>> q = gevent.queue.Queue()
>>> q.put(5)
>>> q.copy().get()
5
>>> q
<Queue at 0x1062760d0 queue=deque([5])>
Let me know if it helps.
Upvotes: 1
Reputation: 475
In the answer by Ryan Ye one line is missed in the end of the dispatcher() function: [j.put(StopIteration) for j in consumer_queues] Without it we still get 'gevent.hub.LoopExit: This operation would block forever' since 'for i in master_queue' loop doesn't copy StopIteration exception into the consumer_queues.
(Sorry, I can't leave comments yet so I write it as a separete answer.)
Upvotes: 0