Mridang Agarwalla
Mridang Agarwalla

Reputation: 45078

Gevent threads don't finish even though all the Queue items are exhausted

I'm trying to set up a simple producer-consumer system in Gevent but my script doesn't exit:

import gevent
from gevent.queue import *
import time
import random

q = Queue()
workers = []

def do_work(wid, value):
    """
    Actual blocking function
    """
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid
    return


def worker(wid):
    """
    Consumer
    """
    while True:
        item = q.get()
        do_work(wid, item)


def producer():
    """
    Producer
    """
    for i in range(4):
        workers.append(gevent.spawn(worker, random.randint(1, 100000)))


    for item in range(1, 9):
         q.put(item)

producer()
gevent.joinall(workers)

I haven't been able to find good examples/tutorials on using Gevent so what I've pasted above is what I've cobbled up from the internet.

Multiple workers get activated, the items go into the queue but even when everything in the queue finishes, the main program doesn't exit. I have to press CTRL ^ C.

What am I doing wrong?

Thanks.

On a side note: if there is anything my script that could be improved, please let me know. Simple things like checking when the Queue is empty, etc.

Upvotes: 6

Views: 4803

Answers (2)

Ivo
Ivo

Reputation: 5420

In your worker, you activate a loop that will run forever.

As a side note, an imho more elegant "forever loop" can be written with just:

for work_unit in q:
    # Do work, etc

gevent.joinall() waits for the workers to finish; but they never do, so your program will forever be waiting. This is what causes it to not exit.

If you don't care about the workers anymore, you can just kill them instead:

gevent.killall(workers)

An alternative is to put a 'special' item in the queue. When a worker receives this item, it recognises it as different from normal work and stops working.

for worker in workers:
    q.put("TimeToDie")

for work_unit in q:
    if work_unint == "TimeToDie":
        break
    do_work()

Or you could even use gevent's Event to do this kind of pattern.

Upvotes: 2

reclosedev
reclosedev

Reputation: 9522

I think you should use JoinableQueue like in example from documentation.

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            do_work(wid, item)
        finally:
            q.task_done()


def producer():
    for i in range(4):
        workers.append(gevent.spawn(worker, random.randint(1, 100000)))

    for item in range(1, 9):
         q.put(item)

producer()
q.join()

Upvotes: 5

Related Questions