Reputation: 117
I have a tsv file with over 3 million item rows. There each Item has an id, group, and a url and the group column is sorted.
i.e
x1 gr1 {some url}/x1.jpg
x2 gr1 {some url}/x2.jpg
x3 gr2 {some url}/x1.jpg
I load it into a python script and need to check for status 200 OK of the url of all items of a group before loading those items into a database. I thought of using processes and doing URL checks on each one, (I don't have much experience with this so not sure if its even a good idea)
My logic atm: Fill array a1 with items with gr1 -> pass each item in a1 to a new process -> that process checks for 200 -> puts it into array a2 if ok -> When all items in a1 are checked push a2 to DB (along with other stuff) -> repeat
This takes likes 30 min for 100,000 items. The bottleneck is the URL check. Without checking URL the script is lightning fast in comparison. So far:
import csv
import re
import requests
import multiprocessing
from pymongo import MongoClient
import sys
#Load in Data
f = open('../tsvsorttest.tsv', 'rb')
reader = csv.reader(f, delimiter='\n')
#Get the first group name
currGroup = re.split(r'\t', next(reader)[0].decode('utf8'))[1]
currGroupNum = 0
items = []
checkedItems = []
#Method that checks the URL, if its 200, add to newItems
def check_url(newitem):
if requests.get(newitem['image_url']).status_code is 200:
print('got an ok!')
checkedItems.append(newitem)
global num_left
num_left -= 1
def clear_img(checkitems):
for thisItem in checkitems:
p = multiprocessing.Process(target=check_url(thisItem))
p.start()
#Start the loop, use i to keep track of the iteration count
for i, utf8_row in enumerate(reader):
unicode_row = utf8_row[0].decode('utf8')
x = re.split(r'\t', unicode_row)
item = {"id": x[0],
"group": x[1],
"item_url": x[2]
}
if currGroup != x[1]:
y = len(items)
print('items length is ' + str(y))
#Dont want single item groups
if y > 1:
print 'beginning url checks'
num_left = len(items)
clear_img(items)
while num_left is not 0:
print 'Waiting'
num_left = 0
batch = {"vran": currGroup,
"bnum": currGroupNum,
"items": newItems,
}
if len(checkedItems) > 0:
batches.insert_one(batch)
currGroupNum += 1
currGroup = x[1]
items = []
checkedItems = []
items.append(item)
if i % 100 == 0:
print "Milestone: " + str(i)
print "done"
Other considerations: Split the original Tsv into like 30 seperate tsv files and run the batch script 30 times in parallel. Would this make a difference?
Upvotes: 1
Views: 1335
Reputation: 3105
It was already mentioned that you should try to use HEAD
instead of GET
. That will avoid having to download the images. Moreover, you seem to be spawning a separate process per request, which is also inefficient.
I don't think that using asyncio is really required here, performance-wise. The solution using a plain thread pool (not even a process pool) is a tad simpler to grasp, IMHO :) Plus, it's available in Python 2.7.
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import csv
from collections import defaultdict
def read_rows(file):
with open(file) as f_in:
return [row for row in csv.reader(f_in, delimiter='\t')]
def check_url(inp):
"""Gets called by workers in thread pool. Checks for existence of URL."""
id, grp, url = inp
def chk():
try:
return requests.head(url).status_code == 200
except IOError as e:
return False
return (id, grp, url, chk())
if __name__ == '__main__':
d = defaultdict(lambda: [])
with ThreadPoolExecutor(max_workers=20) as executor:
future_to_input = {executor.submit(check_url, inp): inp for inp in read_rows('urls.txt')}
for future in as_completed(future_to_input):
id, grp, url, res = future.result()
d[grp].append((id, url, res))
# do something with your d (e.g. sort appropriately, filter those with len(d[grp]) <= 1, ...)
for g, bs in d.items():
print(g)
for id, url, res in bs:
print(" %s %5s %s" % (id, res, url))
As you can see, I process each row of the CSV input individually, and do the grouping on the results (using d
), not on the input. Mostly a matter of taste, I guess. You might want to play around with max_workers=20
and possibly increase it.
Upvotes: 2
Reputation: 1703
import asyncio
from aiohttp import ClientSession, Timeout
import csv
import re
from threading import Thread
from queue import Queue
from time import sleep
async def check(url, session):
try:
with Timeout(10):
async with session.head(url) as response:
if response.status == 200:
return True
elif response.status == 404:
return False
else:
async with session.get(url) as response:
return (response.status == 200)
except:
return False
def worker(q):
while True:
f = q.get()
try:
f()
except Exception as e:
print(e)
q.task_done()
q = Queue()
for i in range(4):
t = Thread(target=worker,args=(q,))
t.daemon = True
t.start()
def item_ok(url):
#Do something
sleep(0.5)
pass
def item_failed(url):
#Do something
sleep(0.5)
pass
def group_done(name,g):
print("group %s with %d items done (%d failed)\n" %
(name,g['total'],g['fail']))
async def bound_check(sem, item, session, groups):
async with sem:
g = groups[item["group"]]
if (await check(item["item_url"], session)):
g["success"] += 1
q.put(lambda: item_ok(item["item_url"]))
else:
g["fail"] += 1
q.put(lambda: item_failed(item["item_url"]))
if g["success"] + g["fail"] == g['total']:
q.put(lambda: group_done(item['group'],g))
bound_check.processed += 1
if bound_check.processed % 100 == 0:
print ("Milestone: %d\n" % bound_check.processed)
bound_check.processed = 0
groups = {}
async def run(max_pending=1000):
#Choose such that you do not run out of FDs
sem = asyncio.Semaphore(max_pending)
f = open('./test.tsv', 'r',encoding = 'utf8')
reader = csv.reader(f, delimiter='\n')
tasks = []
async with ClientSession() as session:
for _, utf8_row in enumerate(reader):
unicode_row = utf8_row[0]
x = re.split(r'\t', unicode_row)
item = {"id": x[0],"group": x[1],"item_url": x[2]}
if not item["group"] in groups:
groups[item["group"]] = {'total' : 1,
'success' : 0,
'fail' : 0,
'items' : [item]}
else:
groups[item["group"]]['total'] += 1
groups[item["group"]]['items'].append(item)
task = asyncio.ensure_future(bound_check(sem, item, session, groups))
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
q.join()
print("Done")
Upvotes: 3