Shane
Shane

Reputation: 1230

Speeding up multithreaded downloads

I've written a Python script that will download files from a website. To speed it up, I've made the downloading of the files multithreaded. Obviously, this is faster than doing the downloads serially, but I've come across some effects that I cannot explain.

  1. The first x files (seems proportional to the amount of threads created) downloaded are incredibly fast--the output shows upwards of 40 files per second--but after that, slows down a lot.
  2. Up to a point (near 200 threads), the maximum speed at which I can download files averages 10 files per second. If I increase the thread count to, say, 700, it still maxes out at 10 files per second. Increasing the thread count to a very large number (over 1,000) seems to limit the download speed based on CPU speed.

So, my questions are:

  1. Why are the first files I download downloaded so fast compared to the rest and can I maintain the original speed?
  2. Why does the thread count have such diminishing returns for download speeds?

Here is my script:

#!/usr/bin/python

import inspect
import math
from queue import Queue
from urllib.request import ProxyHandler, build_opener
from ast import literal_eval
from time import time, sleep
from datetime import timedelta
import random
from threading import Thread, activeCount
import os

proxies = Queue()
threads = Queue()
agents = []
total_files = 0
finished_files = 0
downloaded_files = 0
start_time = 0

class Config(object):
    DEBUG = False
    PROXIES_PATH = '/home/shane/bin/proxies.txt'
    AGENTS_PATH = '/home/shane/bin/user-agents.txt'
    DESTINATION_PATH = '/home/shane/images/%d.jpg'
    SOURCE_URL = 'https://example.org/%d.jpg'
    MAX_THREADS = 500
    TIMEOUT = 62
    RETRIES = 1
    RETRIES_TIME = 1

def get_files_per_second():
    return float(downloaded_files) / (time() - start_time)

def get_time_remaining():
    delta = timedelta(seconds=float(total_files - finished_files) / get_files_per_second())
    seconds = delta.total_seconds()
    days, remainder = divmod(seconds, 86400)
    hours, remainder = divmod(remainder, 3600)
    minutes, seconds = divmod(remainder, 60)
    days = str(int(days)).zfill(2)
    hours = str(int(hours)).zfill(2)
    minutes = str(int(minutes)).zfill(2)
    seconds = str(int(seconds)).zfill(2)
    return "%s:%s:%s:%s" % (days, hours, minutes, seconds)

def release_proxy(opener):
    if Config.DEBUG:
        print('Releasing proxy')
    for handler in opener.handlers:
        if type(handler) is ProxyHandler:
            proxies.put(handler)
            return
    raise Exception('No proxy found')

def get_new_proxy():
    if Config.DEBUG:
        print('Getting new proxy')
    if proxies.empty():
        raise Exception('No proxies')
    return proxies.get()

def get_new_agent():
    if len(agents) == 0:
        raise Exception('No user agents')
    return random.choice(agents)

def get_new_opener():
    opener = build_opener(get_new_proxy())
    opener.addheaders = [('User-Agent', get_new_agent())]
    return opener

def download(opener, source, destination, tries=0):
    global finished_files, downloaded_files
    if Config.DEBUG:
        print('Downloading %s to %s' % (source, destination))
    try:
        result = opener.open(source, timeout=Config.TIMEOUT).read()
        with open(destination, 'wb') as d:
            d.write(result)
        release_proxy(opener)
        finished_files += 1
        downloaded_files += 1
        to_print = '(%d/%d files) (%d proxies) (%f files/second, %s left) (%d threads) %s'
        print(to_print % (finished_files, total_files, proxies.qsize(), round(get_files_per_second(), 2), get_time_remaining(), activeCount(), source))
    except Exception as e:
        if Config.DEBUG:
            print(e)
        if tries < Config.RETRIES:
            sleep(Config.RETRIES_TIME)
            download(opener, source, destination, tries + 1)
        else:
            if proxies.qsize() < Config.MAX_THREADS * 2:
                release_proxy(opener)
            download(get_new_opener(), source, destination, 0)

class Downloader(Thread):
    def __init__(self, source, destination):
        Thread.__init__(self)
        self.source = source
        self.destination = destination
    def run(self):
        if Config.DEBUG:
            print('Running thread')
        download(get_new_opener(), self.source, self.destination)
        if threads.qsize() > 0:
            threads.get().start()

def populate_proxies():
    if Config.DEBUG:
        print('Populating proxies')
    with open(Config.PROXIES_PATH, 'r') as fh:
        for line in fh:
            line = line.replace('\n', '')
            if Config.DEBUG:
                print('Adding %s to proxies' % line)
            proxies.put(ProxyHandler(literal_eval(line)))

def populate_agents():
    if Config.DEBUG:
        print('Populating agents')
    with open(Config.AGENTS_PATH, 'r') as fh:
        for line in fh:
            line = line.replace('\n', '')
            if Config.DEBUG:
                print('Adding %s to agents' % line)
            agents.append(line)

def populate_threads():
    global total_files, finished_files
    if Config.DEBUG:
        print('Populating threads')
    for x in range(0, 100000):
        destination = Config.SOURCE_URL % x
        # queue threads
        print('Queueing %s' % destination)
        threads.put(Downloader(source, destination))

def start_work():
    global start_time
    if threads.qsize() == 0:
        raise Exception('No work to be done')
    start_time = time()
    for x in range(0, min(threads.qsize(), Config.MAX_THREADS)):
        if Config.DEBUG:
            print('Starting thread %d' % x)
        threads.get().start()

populate_proxies()
populate_agents()
populate_threads()
start_work()

Upvotes: 0

Views: 726

Answers (1)

Nitin
Nitin

Reputation: 244

The no. of threads you are using is a very high number, python does not actually run the threads in parallel, it just switches between them frequently, which seems like parallel threads. If the task is CPU intensive, then use multi-processing, else if the task is I/O intensive, threads will be useful. Keep the thread count low (10-70), on a normal Quad-core PC, 8GB ram, else the switching time will reduce the speed of your code. Check these 2 links:

  1. Stack Over Flow Question
  2. Executive Summary On this page.

Upvotes: 1

Related Questions