Ciasto piekarz
Ciasto piekarz

Reputation: 8277

url fetch gets stuck when multiple urls are passed

in the following code below I am trying to first check if the URL status code and then start the relevant thread and do the same for adding it to queue,

however if urls are too many then I get TimeOut error. all code added below but just discovered another bug if I am passing a mp3 file along with some jpeg images the mp3 file downloaded of its correct size is opening as one of the image in urls passed.

_fdUtils

def getParser():
    parser = argparse.ArgumentParser(prog='FileDownloader',
        description='Utility to download files from internet')
    parser.add_argument('-v', '--verbose', default=logging.DEBUG,
        help='by default its on, pass None or False to not spit in shell')
    parser.add_argument('-st', '--saveTo', default=None, action=FullPaths,
        help='location where you want files to download to')
    parser.add_argument('-urls', nargs='*',
        help='urls of files you want to download.')
    parser.add_argument('-se', nargs='*', default=[1], help='Split each url passed to urls by the'\
        " respective split order, if a url doesn't have a split default is taken 1 ")
    return parser.parse_args()

def getResponse(url):
    return requests.head(url, allow_redirects=True, timeout=10, headers={'Accept-Encoding': 'identity'})

def isWorkingURL(url):
    response = getResponse(url)
    return response.status_code in [302, 200, 100, 204, 300]

def getUrl(url):
    """ gets the actual url to download file from.
    """
    response = getResponse(url)
    return response.headers.get('location', url)

error stack Trace:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "python/file_download.py", line 181, in run
    _grabAndWriteToDisk(self, split, url, self.__saveTo, 0, self.queue)
  File "python/file_download.py", line 70, in _grabAndWriteToDisk
    resp = requests.get(url, headers={'Range': 'bytes=%s' % irange}, stream=True)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/requests-2.1.0-py2.7.egg/requests/api.py", line 55, in get
    return request('get', url, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/requests-2.1.0-py2.7.egg/requests/api.py", line 44, in request
    return session.request(method=method, url=url, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/requests-2.1.0-py2.7.egg/requests/sessions.py", line 382, in request
    resp = self.send(prep, **send_kwargs)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/requests-2.1.0-py2.7.egg/requests/sessions.py", line 505, in send
    history = [resp for resp in gen] if allow_redirects else []
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/requests-2.1.0-py2.7.egg/requests/sessions.py", line 167, in resolve_redirects
    allow_redirects=False,
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/requests-2.1.0-py2.7.egg/requests/sessions.py", line 485, in send
    r = adapter.send(request, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/requests-2.1.0-py2.7.egg/requests/adapters.py", line 381, in send
    raise Timeout(e)
Timeout: HTTPConnectionPool(host='ia600506.us.archive.org', port=80): Read timed out. (read timeout=<object object at 0x1002b40b0>)

there we go again:

import argparse
import logging
import Queue
import os
import requests
import signal
import socket
import sys
import time
import threading
import utils as _fdUtils
from collections import OrderedDict
from itertools import izip_longest
from socket import error as SocketError, timeout as SocketTimeout

# timeout in seconds
TIMEOUT = 10
socket.setdefaulttimeout(TIMEOUT)

DESKTOP_PATH = os.path.expanduser("~/Desktop")

appName = 'FileDownloader'

logFile = os.path.join(DESKTOP_PATH, '%s.log' % appName)

_log = _fdUtils.fdLogger(appName, logFile, logging.DEBUG, logging.DEBUG, console_level=logging.DEBUG)

queue = Queue.Queue()
STOP_REQUEST = threading.Event()
maxSplits = threading.BoundedSemaphore(3)
threadLimiter = threading.BoundedSemaphore(5)
lock = threading.Lock()

pulledSize = 0
dataDict = {}

def _grabAndWriteToDisk(threadName, url, saveTo, first=None, queue=None, mode='wb', irange=None):
    """ Function to download file..

        Args:
            url(str): url of file to download
            saveTo(str): path where to save file
            first(int): starting byte of the range
            queue(Queue.Queue): queue object to set status for file download
            mode(str): mode of file to be downloaded
            irange(str): range of byte to download

    """
    fileName = _fdUtils.getFileName(url)
    filePath = os.path.join(saveTo, fileName)
    fileSize = _fdUtils.getUrlSizeInBytes(url)
    downloadedFileSize = 0 if not first else first
    block_sz = 8192
    resp = requests.get(url, headers={'Range': 'bytes=%s' % irange}, stream=True)
    for fileBuffer in resp.iter_content(block_sz):
        if not fileBuffer:
            break

        with open(filePath, mode) as fd:
            downloadedFileSize += len(fileBuffer)
            fd.write(fileBuffer)
            mode = 'a'

            status = r"%10d  [%3.2f%%]" % (downloadedFileSize, downloadedFileSize * 100. / fileSize)
            status = status + chr(8)*(len(status)+1)
            sys.stdout.write('%s\r' % status)
            time.sleep(.01)
            sys.stdout.flush()
            if downloadedFileSize == fileSize:
                STOP_REQUEST.set()
                queue.task_done()
                _log.debug("Downloaded  %s %s%%  using %s and saved to %s", fileName,
                    downloadedFileSize * 100. / fileSize, threadName.getName(), saveTo)


def _downloadChunk(url, idx, irange, fileName, sizeInBytes):
    _log.debug("Downloading %s for first chunk %s of %s " % (irange, idx+1, fileName))
    pulledSize = irange[-1]
    try:
        resp = requests.get(url, allow_redirects=False,  timeout=TIMEOUT,
                            headers={'Range': 'bytes=%s-%s' % (str(irange[0]), str(irange[-1]))}, 
                            stream=True)
    except (SocketTimeout, requests.exceptions), e:
        _log.error(e)
        return


    chunk_size = str(irange[-1])
    for chunk in resp.iter_content(chunk_size):
        status = r"%10d  [%3.2f%%]" % (pulledSize, pulledSize * 100. / int(chunk_size))
        status = status + chr(8)*(len(status)+1)
        sys.stdout.write('%s\r' % status)
        sys.stdout.flush()
        pulledSize += len(chunk)
        dataDict[idx] = chunk
        time.sleep(.03)
        if pulledSize == sizeInBytes:
            _log.info("%s downloaded %3.0f%%", fileName, pulledSize * 100. / sizeInBytes)

class ThreadedFetch(threading.Thread):
    """ docstring for ThreadedFetch
    """
    def __init__(self, saveTo, queue):
        super(ThreadedFetch, self).__init__()
        self.queue = queue
        self.__saveTo = saveTo

    def run(self):
        threadLimiter.acquire()
        try:
            items = self.queue.get()
            url = items[0]
            split = items[-1]
            fileName = _fdUtils.getFileName(url)

            # grab split chunks in separate thread.
            if split > 1:
                maxSplits.acquire()
                try:

                    sizeInBytes = _fdUtils.getUrlSizeInBytes(url)
                    byteRanges = _fdUtils.getRangeSegements(sizeInBytes, split)
                    filePath = os.path.join(self.__saveTo, fileName)

                    downloaders = [
                        threading.Thread(
                            target=_downloadChunk, 
                            args=(url, idx, irange, fileName, sizeInBytes),
                        )
                        for idx, irange in enumerate(byteRanges)
                        ]

                    # start threads, let run in parallel, wait for all to finish
                    for th in downloaders:
                        th.start()

                    # this makes the wait for all thread to finish
                    # which confirms the dataDict is up-to-date
                    for th in downloaders:
                        th.join()
                    downloadedSize = 0
                    with open(filePath, 'wb') as fh:
                        for _idx, chunk in sorted(dataDict.iteritems()):
                            downloadedSize += len(chunk)
                            status = r"%10d  [%3.2f%%]" % (downloadedSize, downloadedSize * 100. / sizeInBytes)
                            status = status + chr(8)*(len(status)+1)
                            fh.write(chunk)
                            sys.stdout.write('%s\r' % status)
                            time.sleep(.04)
                            sys.stdout.flush()
                            if downloadedSize == sizeInBytes:
                                _log.info("%s, saved to %s", fileName, self.__saveTo)
                    self.queue.task_done()
                finally:
                    maxSplits.release()

            else:
                while not STOP_REQUEST.isSet():
                    self.setName("primary_%s_thread" % fileName.split(".")[0])
                    # if downlaod whole file in single chunk no need
                    # to start a new thread, so directly download here.
                    _grabAndWriteToDisk(self, url, self.__saveTo, 0, self.queue)
        finally:
            threadLimiter.release()

def main(appName):
    args = _fdUtils.getParser()

    saveTo = args.saveTo if args.saveTo else DESKTOP_PATH
    # spawn a pool of threads, and pass them queue instance
    # each url will be downloaded concurrently

    unOrdUrls = dict(izip_longest(args.urls, args.se, fillvalue=1))
    ordUrls = OrderedDict([(k, unOrdUrls[k]) for k in sorted(unOrdUrls, key=unOrdUrls.get, reverse=False) if _fdUtils.isWorkingURL(k, _log) and _fdUtils.notOnDisk(k, saveTo)])
    print "length: %s " % len(ordUrls)
    for i in xrange(len(ordUrls)):
        t = ThreadedFetch(saveTo, queue)
        t.daemon = True
        t.start()

    try:
        # populate queue with data 
        for url, split in ordUrls.iteritems():
            url = _fdUtils.getUrl(url)
            print url
            queue.put((url, int(split)))

        # wait on the queue until everything has been processed
        queue.join()
        _log.info('All tasks completed.')
    except (KeyboardInterrupt, SystemExit):
        _log.critical('! Received keyboard interrupt, quitting threads.')

if __name__ == "__main__":
    # change the name of MainThread.
    threading.currentThread().setName("FileDownloader")
    myapp = threading.currentThread().getName()
main(myapp)

Upvotes: 0

Views: 249

Answers (1)

abarnert
abarnert

Reputation: 365935

I see two problems in your code. Since it's incomplete, I'm not sure how it's supposed to work, so I can't promise either one is the particular one you're running into first, but I'm pretty sure you need to fix both.

First:

queue.put((_fdUtils.getUrl(url), int(split)))

That's going to call _fdUtils.getUrl(url) in the main thread, and put the result on the queue. Your comments clearly imply that you intended the downloading to happen on the background threads.

If you wanted to pass a function to be called, just pass the function and its argument as separate members of the tuple, or wrap it up in a closure or a partial:

queue.put((lambda: _fdUtils.getUrl(url), int(split)))

Second:

    t = ThreadedFetch(saveTo, queue)
    t.daemon = True
    t.start()

This starts a thread for every URL. That's almost never a good idea. Generally, downloaders don't use more than 4-16 threads at a time, and no more than 2-4 to the same site. You could easily be timing out because you're spamming some sit too fast and its server or router is making you back off for a while. Or, with a huge number of requests, you could be flooding your own network and blocking ACKs or even rebooting the router (especially if you have either a cheap home WiFi router or ADSL with a crappy provider).

Also, a much simpler way to do this would be to use a smart pool, like a multiprocessing.dummy.Pool (multiprocessing.dummy means it acts like the multiprocessing module but uses threads) or, even better, a concurrent.futures.ThreadPoolExecutor. In fact, if you look at the docs, a parallel downloader is the first example for ThreadPoolExecutor.

Upvotes: 2

Related Questions