Mridang Agarwalla
Mridang Agarwalla

Reputation: 45078

How can I implement a multi-producer, multi-consumer paradigm in Gevent?

I have some producer function which rely on I/O heavy blocking calls and some consumer functions which too rely on I/O heavy blocking calls. In order to speed them up, I used the Gevent micro-threading library as glue.

Here's what my paradigm looks like:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)



for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

#This doesnt work.
for j in range(2):
    producers.append(gevent.spawn(producer))

#Uncommenting this makes this script work.
#producer()

q.join()

I have four consumer and would like to have two producers. The producers exit when they a signal i.e. 10. The consumers keep feeding off this queue and the whole task finishes when the producers and consumers are over.

However, this doesn't work. If I comment out the for loop which spawns multiple producers and use only a single producer, the script runs fine.

I can't seem to figure out what I've done wrong.

Any ideas?

Thanks

Upvotes: 5

Views: 2306

Answers (4)

Ryan Chou
Ryan Chou

Reputation: 1132

I have met same issues like yours. The main problem with your code was that your producer has been spawned in gevent thread which make worker couldn't get task immediately.

I suggest that you should run producer() in the main process not spawn in gevent thread When the process run met the producer which could push the task immediately.

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


producer()

for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

Codes above make sense.. :)

Upvotes: 0

Stephen Diehl
Stephen Diehl

Reputation: 8419

What you want do to is to block the main program while the producers and workers communicate. Blocking on the queue will wait until the queue is empty and then yield, which could be immediately. Put this at the end of your program instead of q.join()

gevent.joinall(producers)

Upvotes: 0

Ivo
Ivo

Reputation: 5420

You don't actually want to quit when the queue has no unfinished work, because conceptually that's not when the application should finish.

You want to quit when the producers have finished, and then when there is no unfinished work.

# Wait for all producers to finish producing
gevent.joinall(producers)
# *Now* we want to make sure there's no unfinished work
q.join()
# We don't care about workers. We weren't paying them anything, anyways
gevent.killall(workers)
# And, we're done.

Upvotes: 6

zch
zch

Reputation: 15278

I think it does q.join() before anything is put in the queue and exits immediately. Try joining all producers before joining queue.

Upvotes: 3

Related Questions