Reputation: 2521
Novice to threading here. I'm borrowing a lot of the code from this thread while trying to build my first script using threading/queue:
import threading, urllib2
import Queue
import sys
from PIL import Image
import io, sys
def avhash(url,queue):
if not isinstance(url, Image.Image):
try:
im = Image.open(url)
except IOError:
fd=urllib2.urlopen(url)
image_file=io.BytesIO(fd.read())
im=Image.open(image_file)
im = im.resize((8, 8), Image.ANTIALIAS).convert('L')
avg = reduce(lambda x, y: x + y, im.getdata()) / 64.
hash = reduce(lambda x, (y, z): x | (z << y),
enumerate(map(lambda i: 0 if i < avg else 1, im.getdata())),
0)
queue.put({url:hash})
queue.task_done()
def fetch_parallel(job_list):
q = Queue.Queue()
threads = [threading.Thread(target=avhash, args = (job,q)) for job in job_list[0:50]]
for t in threads:
t.daemon = True
t.start()
for t in threads:
t.join()
return [q.get() for _ in xrange(len(job_list))]
In this case the job_list is a list of URLs. I've found that this code works fine when this list is equal to or less than 50, but it hangs when > 50. There must be something I'm fundamentally not understanding about how threading works?
Upvotes: 0
Views: 266
Reputation: 10162
Your problem is this line:
return [q.get() for _ in xrange(len(job_list))]
If job_list
has more than 50 elements, then you try to read more results from your queue than you have put in. Therefore:
return [q.get() for _ in xrange(len(job_list[:50]))]
or, even better:
MAX_LEN = 50
...
threads = [... for job in job_list[:MAXLEN]]
...
return [q.get() for _ in job_list[:MAXLEN]]
[EDIT]
It seems you want your program to do something different than what it does. Your program takes the first 50 entries in job_list
, handles each of these in a thread and disregards all other jobs. From your comment below I assume you want to handle all jobs, but only 50 at a time. For this, you should use a thread pool. In Python >= 3.2 you could use concurrent.futures.ThreadPoolExecutor
[link].
In Python < 3.2 you have to roll your own:
CHUNK_SIZE = 50
def fetch_parallel(job_list):
results = []
queue = Queue.Queue()
while job_list:
threads = [threading.Thread(target=avhash, args=(job, queue))
for job in job_list[:CHUNK_SIZE]]
job_list = job_list[CHUNK_SIZE:]
for thread in threads:
thread.daemon = True
thread.start()
for thread in threads:
thread.join()
results.extend(queue.get() for _ in threads)
return results
(untested)
[/EDIT]
Upvotes: 0