Reputation: 321
This code runs ok for a little bit, then it gives me this error:
thread.error: can't start new thread
What am I doing wrong? The names file is about 10,000 names long, the email file is about 5 emails long.
for x in open(names):
name = x.strip()
def check(q):
while True:
email = q.get()
lock.acquire()
print email, name, threading.active_count()
lock.release()
#Do things in
#the internet
q.task_done()
return
for i in range(threads):
t = threading.Thread(target=check, args=(q,))
t.setDaemon(True)
t.start()
for word in open(emails):
q.put(word.strip())
q.join()
I only specify 2 threads, but it ends up creating hundreds then crashes when the active_count is around 890. How can I fix this?
Upvotes: 1
Views: 4803
Reputation: 1371
Here is a slightly modified version using a semaphore object
import threading
import Queue
NUM_THREADS = 2 # you can change this if you want
semaphore = threading.Semaphore(NUM_THREADS)
threads = NUM_THREADS
running_threads = []
lock = threading.Lock()
q = Queue.Queue()
# moved the check function out of the loop
def check(name, q, s):
# acquire the semaphore
with s:
not_empty = True
while not_empty:
try:
email = q.get(False) # we are passing false so it won't block.
except Queue.Empty, e:
not_empty = False
break
lock.acquire()
print email, name, threading.active_count()
lock.release()
# additional work ...
q.task_done()
for x in open(names):
name = x.strip()
for word in open(emails):
q.put(word.strip())
for i in range(threads):
t = threading.Thread(target=check, args=(name, q, semaphore))
# t.setDaemon(True) # we are not setting the damenon flag
t.start()
running_threads.append(t)
# joining threads (we need this if the daemon flag is false)
for t in running_threads:
t.join()
# joining queue (Probably won't need this if the daemon flag is false)
q.join()
Upvotes: 1
Reputation: 414079
You could simplify your code using a thread pool:
from contextlib import closing
from itertools import product
from multiprocessing.dummy import Pool # thread pool
def foo(arg):
name, email = map(str.strip, arg)
try:
# "do things in the internet"
except Exception as e:
return (name, email), None, str(e)
else:
return (name, email), result, None
with open(names_filename) as names_file, \
open(emails_filename) as emails_file, \
closing(Pool(max_threads_count)) as pool:
args = product(names_file, emails_file)
it = pool.imap_unordered(foo, args, chunksize=100)
for (name, email), result, error in it:
if error is not None:
print("Failed to foo {} {}: {}".format(name, email, error))
Upvotes: 0