Aefix
Aefix

Reputation: 177

Queue doesn't process all elements when there are many threads

I have noticed that when I have many threads pulling elements from a queue, there are less elements processed than the number that I put into the queue. This is sporadic but seems to happen somewhere around half the time when I run the following code.

#!/bin/env python

from threading import Thread
import httplib, sys
from Queue import Queue
import time
import random

concurrent = 500
num_jobs = 500

results = {}

def doWork():
    while True:
        result = None
        try:
            result = curl(q.get())
        except Exception as e:
            print "Error when trying to get from queue: {0}".format(str(e))

        if results.has_key(result):
            results[result] += 1
        else:
            results[result] = 1

        try:
            q.task_done()
        except:
            print "Called task_done when all tasks were done"

def curl(ourl):
    result = 'all good'
    try:
        time.sleep(random.random() * 2)
    except Exception as e:
        result = "error: %s" % str(e)
    except:
        result = str(sys.exc_info()[0])
    finally: 
        return result or "None"

print "\nRunning {0} jobs on {1} threads...".format(num_jobs, concurrent)

q = Queue()

for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()

for x in range(num_jobs):
    q.put("something")

try:
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

total_responses = 0
for result in results:
    num_responses = results[result]
    print "{0}: {1} time(s)".format(result, num_responses)
    total_responses += num_responses

print "Number of elements processed: {0}".format(total_responses)

Upvotes: 3

Views: 252

Answers (1)

Adam Smith
Adam Smith

Reputation: 54193

Tim Peters hit the nail on the head in the comments. The issue is that the tracking of results is threaded and isn't protected by any sort of mutex. That allows something like this to happen:

thread A gets result: "all good"
thread A checks results[result]
thread A sees no such key
thread A suspends  # <-- before counting its result
thread B gets result: "all good"
thread B checks results[result]
thread B sees no such key
thread B sets results['all good'] = 1
thread C ...
thread C sets results['all good'] = 2
thread D ...
thread A resumes  # <-- and remembers it needs to count its result still
thread A sets results['all good'] = 1  # resetting previous work!

A more typical workflow might have a results queue that the main thread is listening on.

workq = queue.Queue()
resultsq = queue.Queue()

make_work(into=workq)
do_work(from=workq, respond_on=resultsq)
# do_work would do respond_on.put_nowait(result) instead of
#   return result

results = {}

while True:
    try:
        result = resultsq.get()
    except queue.Empty:
        break  # maybe? You'd probably want to retry a few times
    results.setdefault(result, 0) += 1

Upvotes: 1

Related Questions