Reputation: 21
I'm trying to write a program that crawls through a website and download all the videos it has. I'm facing a problem that the number of threads continuously increases even after the downloading of individual videos are done.
Here is the code for the individual Worker object, which is queued and then joined later. This is the only part of the code at which I generate a Thread. What I don't understand is how there can be remaining threads if given the object, I implement the self.stop()
function and the while loop breaks.
class Worker(Thread):
def __init__(self, thread_pool):
Thread.__init__(self)
self.tasks = thread_pool.tasks
self.tasks_info = thread_pool.tasks_info
self.daemon = True
self._is_running=True
self.start()
def stop(self):
self._is_running = False
def run(self):
while self._is_running:
func, args, kargs = self.tasks.get()
try: func(*args, **kargs)
except Exception:
print("\nError: Threadpool error.")
sys.exit(1)
self.tasks_info['num_tasks_complete'] += 1
self.tasks.task_done()
self.stop()
I've used the thread functions to check which threads are alive, and it turns out that it is indeed mostly the worker functions as well as other objects called Thread(SockThread)
and _MainThread
, which I do not know how to close.
Please advise on 1. why the Worker thread is not ending and 2. how to get rid of the Thread(SockThread)
as well as the _MainThread
.
Thank you!
edit 1
class ThreadPool:
def __init__(self, name, num_threads, num_tasks):
self.tasks = Queue(num_threads)
self.num_threads=num_threads
self.tasks_info = {
'name': name,
'num_tasks': num_tasks,
'num_tasks_complete': 0
}
for _ in range(num_threads):
Worker(self)
print(threading.active_count)
def add_task(self, func, *args, **kwargs):
self.tasks.put((func, args, kwargs))
def wait_completion(self):
print("at the beginning of wait_completion:")
print(threading.active_count())
Upvotes: 2
Views: 2269
Reputation: 77
By looking at your code it looks like you have initialized the thread which calls the run()
method for processing. After that you're even using the start method which is not the proper way. Your code should be as follows:
from threading import Event
class Worker(Thread):
def __init__(self, thread_pool):
self.tasks = thread_pool.tasks
self.tasks_info = thread_pool.tasks_info
self.exit = Event()
super(Thread,self).__init__()
def shutdown(self):
self.exit.set()
def run(self):
while not self.exit.is_set():
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception:
print("\nError: Threadpool error.")
# use shutdown method for error
self.shutdown()
sys.exit(1)
self.tasks_info['num_tasks_complete'] += 1
self.tasks.task_done()
self.shutdown()
Upvotes: 1