Jim Clam
Jim Clam

Reputation: 117

Efficiently check millions of Image URLs in Python

I have a tsv file with over 3 million item rows. There each Item has an id, group, and a url and the group column is sorted.

i.e

x1    gr1    {some url}/x1.jpg
x2    gr1    {some url}/x2.jpg
x3    gr2    {some url}/x1.jpg  

I load it into a python script and need to check for status 200 OK of the url of all items of a group before loading those items into a database. I thought of using processes and doing URL checks on each one, (I don't have much experience with this so not sure if its even a good idea)

My logic atm: Fill array a1 with items with gr1 -> pass each item in a1 to a new process -> that process checks for 200 -> puts it into array a2 if ok -> When all items in a1 are checked push a2 to DB (along with other stuff) -> repeat

This takes likes 30 min for 100,000 items. The bottleneck is the URL check. Without checking URL the script is lightning fast in comparison. So far:

import csv
import re
import requests
import multiprocessing
from pymongo import MongoClient
import sys

#Load in Data
f = open('../tsvsorttest.tsv', 'rb')
reader = csv.reader(f, delimiter='\n')

#Get the first group name
currGroup = re.split(r'\t', next(reader)[0].decode('utf8'))[1]
currGroupNum = 0 
items = []
checkedItems = []

#Method that checks the URL, if its 200, add to newItems
def check_url(newitem):
    if requests.get(newitem['image_url']).status_code is 200:
        print('got an ok!')
        checkedItems.append(newitem)
    global num_left
    num_left -= 1


def clear_img(checkitems):
    for thisItem in checkitems:
        p = multiprocessing.Process(target=check_url(thisItem))
        p.start()

#Start the loop, use i to keep track of the iteration count
for i, utf8_row in enumerate(reader):
    unicode_row = utf8_row[0].decode('utf8')

    x = re.split(r'\t', unicode_row)

    item = {"id": x[0],
            "group": x[1],
            "item_url": x[2]
            }
    if currGroup != x[1]:
        y = len(items)
        print('items length is ' + str(y))

        #Dont want single item groups
        if y > 1:
            print 'beginning url checks'
            num_left = len(items)


            clear_img(items)
            while num_left is not 0:
                print 'Waiting'

            num_left = 0
            batch = {"vran": currGroup,
                     "bnum": currGroupNum,
                     "items": newItems,
                     }
            if len(checkedItems) > 0:
                batches.insert_one(batch)
                currGroupNum += 1

        currGroup = x[1]
        items = []
        checkedItems = []

    items.append(item)

    if i % 100 == 0:
        print "Milestone: " + str(i)

print "done"

Other considerations: Split the original Tsv into like 30 seperate tsv files and run the batch script 30 times in parallel. Would this make a difference?

Upvotes: 1

Views: 1335

Answers (2)

dnswlt
dnswlt

Reputation: 3105

It was already mentioned that you should try to use HEAD instead of GET. That will avoid having to download the images. Moreover, you seem to be spawning a separate process per request, which is also inefficient.

I don't think that using asyncio is really required here, performance-wise. The solution using a plain thread pool (not even a process pool) is a tad simpler to grasp, IMHO :) Plus, it's available in Python 2.7.

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import csv
from collections import defaultdict

def read_rows(file):
    with open(file) as f_in:
        return [row for row in csv.reader(f_in, delimiter='\t')]

def check_url(inp):
    """Gets called by workers in thread pool. Checks for existence of URL."""
    id, grp, url = inp
    def chk():
        try:
            return requests.head(url).status_code == 200
        except IOError as e:
            return False
    return (id, grp, url, chk())

if __name__ == '__main__':
    d = defaultdict(lambda: [])
    with ThreadPoolExecutor(max_workers=20) as executor:
        future_to_input = {executor.submit(check_url, inp): inp for inp in read_rows('urls.txt')}
        for future in as_completed(future_to_input):
            id, grp, url, res = future.result()
            d[grp].append((id, url, res))
    # do something with your d (e.g. sort appropriately, filter those with len(d[grp]) <= 1, ...)
    for g, bs in d.items():
        print(g)
        for id, url, res in bs:
            print("  %s %5s %s" % (id, res, url))

As you can see, I process each row of the CSV input individually, and do the grouping on the results (using d), not on the input. Mostly a matter of taste, I guess. You might want to play around with max_workers=20 and possibly increase it.

Upvotes: 2

Jonathan von Schroeder
Jonathan von Schroeder

Reputation: 1703

  1. As you do not need the actual images using a HEAD request should improve the speed. If the response is neither 200 nor 404 HEAD may not be allowed (405) and you simply try again using a GET request.
  2. You currently wait for the current group to finish before starting any new tasks. Generally it would be preferable to always keep the same number of running requests roughly the same. Also you probably want to drastically increase the Pool of workers - As the tasks are mostly I/O-Bound I'd however suggest you do something along the lines of 3 (i.e. asynchronous I/O).
  3. If you are open to using Python 3 you can make use of the excellent support for asynchronous I/O (https://docs.python.org/3/library/asyncio.html) by using https://pypi.python.org/pypi/aiohttp:
import asyncio
from aiohttp import ClientSession, Timeout
import csv
import re
from threading import Thread
from queue import Queue
from time import sleep

async def check(url, session):
    try:
        with Timeout(10):
            async with session.head(url) as response:
                if response.status == 200:
                    return True
                elif response.status == 404:
                    return False
                else:
                    async with session.get(url) as response:
                        return (response.status == 200)
    except:
        return False



def worker(q):
    while True:
        f = q.get()
        try:
            f()
        except Exception as e:
            print(e)
        q.task_done()

q = Queue()
for i in range(4):
     t = Thread(target=worker,args=(q,))
     t.daemon = True
     t.start()

def item_ok(url):
    #Do something
    sleep(0.5)
    pass

def item_failed(url):
    #Do something
    sleep(0.5)
    pass

def group_done(name,g):
    print("group %s with %d items done (%d failed)\n" %
          (name,g['total'],g['fail']))

async def bound_check(sem, item, session, groups):
    async with sem:
        g = groups[item["group"]]
        if (await check(item["item_url"], session)):
            g["success"] += 1
            q.put(lambda: item_ok(item["item_url"]))
        else:
            g["fail"] += 1
            q.put(lambda: item_failed(item["item_url"]))
        if g["success"] + g["fail"] == g['total']:
            q.put(lambda: group_done(item['group'],g))
        bound_check.processed += 1
        if bound_check.processed % 100 == 0:
            print ("Milestone: %d\n" % bound_check.processed)

bound_check.processed = 0

groups = {}

async def run(max_pending=1000):
    #Choose such that you do not run out of FDs
    sem = asyncio.Semaphore(max_pending)

    f = open('./test.tsv', 'r',encoding = 'utf8')
    reader = csv.reader(f, delimiter='\n')

    tasks = []

    async with ClientSession() as session:
        for _, utf8_row in enumerate(reader):
            unicode_row = utf8_row[0]
            x = re.split(r'\t', unicode_row)
            item = {"id": x[0],"group": x[1],"item_url": x[2]}
            if not item["group"] in groups:
                groups[item["group"]] = {'total'    : 1,
                                         'success'  : 0,
                                         'fail'     : 0,
                                         'items'    : [item]}
            else:
                groups[item["group"]]['total'] += 1
                groups[item["group"]]['items'].append(item)
            task = asyncio.ensure_future(bound_check(sem, item, session, groups))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses

loop = asyncio.get_event_loop()
loop.run_until_complete(run())
q.join()

print("Done")

Upvotes: 3

Related Questions