Reputation: 1230
I've written a Python script that will download files from a website. To speed it up, I've made the downloading of the files multithreaded. Obviously, this is faster than doing the downloads serially, but I've come across some effects that I cannot explain.
x
files (seems proportional to the amount of threads created) downloaded are incredibly fast--the output shows upwards of 40 files per second--but after that, slows down a lot.So, my questions are:
Here is my script:
#!/usr/bin/python
import inspect
import math
from queue import Queue
from urllib.request import ProxyHandler, build_opener
from ast import literal_eval
from time import time, sleep
from datetime import timedelta
import random
from threading import Thread, activeCount
import os
proxies = Queue()
threads = Queue()
agents = []
total_files = 0
finished_files = 0
downloaded_files = 0
start_time = 0
class Config(object):
DEBUG = False
PROXIES_PATH = '/home/shane/bin/proxies.txt'
AGENTS_PATH = '/home/shane/bin/user-agents.txt'
DESTINATION_PATH = '/home/shane/images/%d.jpg'
SOURCE_URL = 'https://example.org/%d.jpg'
MAX_THREADS = 500
TIMEOUT = 62
RETRIES = 1
RETRIES_TIME = 1
def get_files_per_second():
return float(downloaded_files) / (time() - start_time)
def get_time_remaining():
delta = timedelta(seconds=float(total_files - finished_files) / get_files_per_second())
seconds = delta.total_seconds()
days, remainder = divmod(seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
days = str(int(days)).zfill(2)
hours = str(int(hours)).zfill(2)
minutes = str(int(minutes)).zfill(2)
seconds = str(int(seconds)).zfill(2)
return "%s:%s:%s:%s" % (days, hours, minutes, seconds)
def release_proxy(opener):
if Config.DEBUG:
print('Releasing proxy')
for handler in opener.handlers:
if type(handler) is ProxyHandler:
proxies.put(handler)
return
raise Exception('No proxy found')
def get_new_proxy():
if Config.DEBUG:
print('Getting new proxy')
if proxies.empty():
raise Exception('No proxies')
return proxies.get()
def get_new_agent():
if len(agents) == 0:
raise Exception('No user agents')
return random.choice(agents)
def get_new_opener():
opener = build_opener(get_new_proxy())
opener.addheaders = [('User-Agent', get_new_agent())]
return opener
def download(opener, source, destination, tries=0):
global finished_files, downloaded_files
if Config.DEBUG:
print('Downloading %s to %s' % (source, destination))
try:
result = opener.open(source, timeout=Config.TIMEOUT).read()
with open(destination, 'wb') as d:
d.write(result)
release_proxy(opener)
finished_files += 1
downloaded_files += 1
to_print = '(%d/%d files) (%d proxies) (%f files/second, %s left) (%d threads) %s'
print(to_print % (finished_files, total_files, proxies.qsize(), round(get_files_per_second(), 2), get_time_remaining(), activeCount(), source))
except Exception as e:
if Config.DEBUG:
print(e)
if tries < Config.RETRIES:
sleep(Config.RETRIES_TIME)
download(opener, source, destination, tries + 1)
else:
if proxies.qsize() < Config.MAX_THREADS * 2:
release_proxy(opener)
download(get_new_opener(), source, destination, 0)
class Downloader(Thread):
def __init__(self, source, destination):
Thread.__init__(self)
self.source = source
self.destination = destination
def run(self):
if Config.DEBUG:
print('Running thread')
download(get_new_opener(), self.source, self.destination)
if threads.qsize() > 0:
threads.get().start()
def populate_proxies():
if Config.DEBUG:
print('Populating proxies')
with open(Config.PROXIES_PATH, 'r') as fh:
for line in fh:
line = line.replace('\n', '')
if Config.DEBUG:
print('Adding %s to proxies' % line)
proxies.put(ProxyHandler(literal_eval(line)))
def populate_agents():
if Config.DEBUG:
print('Populating agents')
with open(Config.AGENTS_PATH, 'r') as fh:
for line in fh:
line = line.replace('\n', '')
if Config.DEBUG:
print('Adding %s to agents' % line)
agents.append(line)
def populate_threads():
global total_files, finished_files
if Config.DEBUG:
print('Populating threads')
for x in range(0, 100000):
destination = Config.SOURCE_URL % x
# queue threads
print('Queueing %s' % destination)
threads.put(Downloader(source, destination))
def start_work():
global start_time
if threads.qsize() == 0:
raise Exception('No work to be done')
start_time = time()
for x in range(0, min(threads.qsize(), Config.MAX_THREADS)):
if Config.DEBUG:
print('Starting thread %d' % x)
threads.get().start()
populate_proxies()
populate_agents()
populate_threads()
start_work()
Upvotes: 0
Views: 726
Reputation: 244
The no. of threads you are using is a very high number, python does not actually run the threads in parallel, it just switches between them frequently, which seems like parallel threads. If the task is CPU intensive, then use multi-processing, else if the task is I/O intensive, threads will be useful. Keep the thread count low (10-70), on a normal Quad-core PC, 8GB ram, else the switching time will reduce the speed of your code. Check these 2 links:
Upvotes: 1