Reputation: 654
I'm creating a threaded python script that has a collection of files that is put into a queue and then an unknown amount of threads (default is 3) to start downloading. When each of the threads complete it updates the stdout with the queue status and a percentage. All the files are being downloaded but the status information is wrong on the 3rd thread and I'm not sure why. I've been considering creating a work_completed queue to use for the calculation but don't think I should have to/that it would matter. Could someone point me in the right direction here?
download_queue = queue.Queue()
class Downloader(threading.Thread):
def __init__(self,work_queue):
super().__init__()
self.current_job = 0
self.work_queue = work_queue
self.queue_size = work_queue.qsize()
def run(self):
while self.work_queue.qsize() > 0:
url = self.work_queue.get(True)
system_call = "wget -nc -q {0} -O {1}".format(url,local_file)
os.system(system_call)
self.current_job = int(self.queue_size) - int(self.work_queue.qsize())
self.percent = (self.current_job / self.queue_size) * 100
sys.stdout.flush()
status = "\rDownloading " + url.split('/')[-1] + " [status: " + str(self.current_job) + "/" + str(self.queue_size) + ", " + str(round(self.percent,2)) + "%]"
finally:
self.work_queue.task_done()
def main:
if download_queue.qsize() > 0:
if options.active_downloads:
active_downloads = options.active_downloads
else:
active_downloads = 3
for x in range(active_downloads):
downloader = Downloader(download_queue)
downloader.start()
download_queue.join()
Upvotes: 2
Views: 5420
Reputation: 26699
If you'd like to use the multiprocessing
module, it includes a very nice parallel imap_unordered
, which would reduce your problem to the very elegant:
import multiprocessing, sys
class ParallelDownload:
def __init__(self, urls, processcount=3):
self.total_items = len(urls)
self.pool = multiprocessing.Pool(processcount)
for n, status in enumerate(self.pool.imap_unordered(self.download, urls)):
stats = (n, self.total_items, n/self.total_items)
sys.stdout.write(status + " [%d/%d = %0.2f %%]\n"%stats)
def download(self, url):
system_call = "wget -nc -q {0} -O {1}".format(url, local_file)
os.system(system_call)
status = "\rDownloaded " + url.split('/')[-1]
return status
Upvotes: 2
Reputation: 26699
You can't check the queue size in one statement, and then .get()
from the queue in the next. In the meantime the whole world may have changed. The .get()
method call is the single atomic operation you need to call. If it raises Empty
or blocks, the queue is empty.
Your threads can overwrite each other's output. I would have another thread with an input queue whos only job is to print the items in the queue to stdout. It can also count off the number of completed items and produce status information.
I also tend not to subclass Thread
, but instead just supply a plain Thread
instance with a target=
parameter and .start()
the thread.
based on your response, try this:
download_queue = queue.Queue()
class Downloader(threading.Thread):
def __init__(self,work_queue, original_size):
super().__init__()
self.current_job = 0
self.work_queue = work_queue
self.queue_size = original_size
def run(self):
while True:
try:
url = self.work_queue.get(False)
system_call = "wget -nc -q {0} -O {1}".format(url,local_file)
os.system(system_call)
# the following code is questionable. By the time we get here,
# many other items may have been taken off the queue.
self.current_job = int(self.queue_size) - int(self.work_queue.qsize())
self.percent = (self.current_job / self.queue_size) * 100
sys.stdout.flush()
status = ("\rDownloading " + url.split('/')[-1] +
" [status: " + str(self.current_job) +
"/" + str(self.queue_size) + ", " +
str(round(self.percent,2)) + "%]" )
except queue.Empty:
pass
finally:
self.work_queue.task_done()
def main:
if download_queue.qsize() > 0:
original_size = download_queue.qsize()
if options.active_downloads:
active_downloads = options.active_downloads
else:
active_downloads = 3
for x in range(active_downloads):
downloader = Downloader(download_queue, original_size)
downloader.start()
download_queue.join()
Upvotes: 4