manatlan
manatlan

Reputation: 1131

Multiple consumers, is it possible to clone a queue (gevent)?

I'd like to do something like that (1 queue, and multiple consumers):

import gevent
from gevent import queue

q=queue.Queue()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)

def consumer(qq):
    for i in qq:
        print i

jobs=[gevent.spawn(consumer,i) for i in [q,q]]

gevent.joinall(jobs)

But it's not possible ... the queue is consumed by job1 ... so job2 would block forever. It gives me the exception gevent.hub.LoopExit: This operation would block forever.

I would that each consumer will be able to consume the full queue from start. (should display 1,2,3,1,2,3 or 1,1,2,2,3,3 ... nevermind)

One idea should be to clone the queue before spawning, but it's not possible using copy (shallow/deep) module ;-(

Is there another way to do that ?

[EDIT] what do you think of that ?

import gevent
from gevent import queue

class MasterQueueClonable(queue.Queue):
    def __init__(self,*a,**k):
        queue.Queue.__init__(self,*a,**k)

        self.__cloned = []
        self.__old=[]

    #override
    def get(self,*a,**k):
        e=queue.Queue.get(self,*a,**k)
        for i in self.__cloned:  i.put(e) # serve to current clones
        self.__old.append(e)              # save old element
        return e

    def clone(self):
        q=queue.Queue()
        for i in self.__old: q.put(i)   # feed a queue with elements which are out
        self.__cloned.append(q)         # stock the queue, to be able to put newer elements too
        return q

q=MasterQueueClonable()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)

def consumer(qq):
    for i in qq:
        print id(qq),i

jobs=[gevent.spawn(consumer,i) for i in [q.clone(), q ,q.clone(),q.clone()]]
gevent.joinall(jobs)

It's based on the idea of RyanYe. There is a "master queue" without a dispatcher. My master queue override the GET method, and can dispatch to an ondemand clone. And more, a "clone" can be created after the start of the masterqueue (with the __old trick).

Upvotes: 2

Views: 1375

Answers (3)

Ryan Ye
Ryan Ye

Reputation: 3269

I suggest you to create a greenlet to dispatch the work to consumers. Example code:

import gevent
from gevent import queue

master_queue=queue.Queue()
master_queue.put(1)
master_queue.put(2)
master_queue.put(3)
master_queue.put(StopIteration)

total_consumers = 10
consumer_queues = [queue.Queue() for i in xrange(total_consumers)]

def dispatcher(master_queue, consumer_queues):
    for i in master_queue:
        [j.put(i) for j in consumer_queues]
    [j.put(StopIteration) for j in consumer_queues]

def consumer(qq):
    for i in qq:
        print i

jobs=[gevent.spawn(dispatcher, q, consumer_queues)] + [gevent.spawn(consumer, i) for i in consumer_queues]
gevent.joinall(jobs)

UPDATE: Fix missing StopIteration for consumer queues. Thanks arilou for pointing it out.

Upvotes: 2

Denis
Denis

Reputation: 3780

I've added copy() method to Queue class:

>>> import gevent.queue
>>> q = gevent.queue.Queue()
>>> q.put(5)
>>> q.copy().get()
5
>>> q
<Queue at 0x1062760d0 queue=deque([5])>

Let me know if it helps.

Upvotes: 1

arilou
arilou

Reputation: 475

In the answer by Ryan Ye one line is missed in the end of the dispatcher() function: [j.put(StopIteration) for j in consumer_queues] Without it we still get 'gevent.hub.LoopExit: This operation would block forever' since 'for i in master_queue' loop doesn't copy StopIteration exception into the consumer_queues.

(Sorry, I can't leave comments yet so I write it as a separete answer.)

Upvotes: 0

Related Questions