ChrisArmstrong
ChrisArmstrong

Reputation: 2521

Thread/Queue hanging issue

Novice to threading here. I'm borrowing a lot of the code from this thread while trying to build my first script using threading/queue:

import threading, urllib2
import Queue
import sys
from PIL import Image
import io, sys

def avhash(url,queue):
    if not isinstance(url, Image.Image):
        try:
            im = Image.open(url)
        except IOError:
            fd=urllib2.urlopen(url)
            image_file=io.BytesIO(fd.read())
            im=Image.open(image_file)
            im = im.resize((8, 8), Image.ANTIALIAS).convert('L')
            avg = reduce(lambda x, y: x + y, im.getdata()) / 64.
    hash = reduce(lambda x, (y, z): x | (z << y),
                  enumerate(map(lambda i: 0 if i < avg else 1, im.getdata())),
                  0)

    queue.put({url:hash})
    queue.task_done()

def fetch_parallel(job_list):
    q = Queue.Queue()
    threads = [threading.Thread(target=avhash, args = (job,q)) for job in job_list[0:50]]
    for t in threads:
        t.daemon = True
        t.start()

    for t in threads:
        t.join()
    return [q.get() for _ in xrange(len(job_list))]

In this case the job_list is a list of URLs. I've found that this code works fine when this list is equal to or less than 50, but it hangs when > 50. There must be something I'm fundamentally not understanding about how threading works?

Upvotes: 0

Views: 266

Answers (1)

pillmuncher
pillmuncher

Reputation: 10162

Your problem is this line:

return [q.get() for _ in xrange(len(job_list))]

If job_list has more than 50 elements, then you try to read more results from your queue than you have put in. Therefore:

return [q.get() for _ in xrange(len(job_list[:50]))]

or, even better:

MAX_LEN = 50
...
threads = [... for job in job_list[:MAXLEN]]
...
return [q.get() for _ in job_list[:MAXLEN]]

[EDIT]

It seems you want your program to do something different than what it does. Your program takes the first 50 entries in job_list, handles each of these in a thread and disregards all other jobs. From your comment below I assume you want to handle all jobs, but only 50 at a time. For this, you should use a thread pool. In Python >= 3.2 you could use concurrent.futures.ThreadPoolExecutor [link].

In Python < 3.2 you have to roll your own:

CHUNK_SIZE = 50

def fetch_parallel(job_list):
    results = []
    queue = Queue.Queue()
    while job_list:
        threads = [threading.Thread(target=avhash, args=(job, queue))
                      for job in job_list[:CHUNK_SIZE]]
        job_list = job_list[CHUNK_SIZE:]
        for thread in threads:
            thread.daemon = True
            thread.start()
        for thread in threads:
            thread.join()
        results.extend(queue.get() for _ in threads)
    return results

(untested)

[/EDIT]

Upvotes: 0

Related Questions