Reputation: 104
I'm trying to build a class that uses multiprocessing + requests to make several requests in parallel. I'm running into an issue where it just hangs and gives me a cryptic error message and I'm not sure way.
Below is my code, it basically just uses a Pool with a callback to put results into a list. I have the requirement that I need a "hard timeout" for each URL, i.e. if a URL is taking more than a few seconds to get its content downloaded I just want to skip it. So I use a Pool timeout and do a diff on URLs attempted vs. URL content returned, the ones that were attempted but not returned are assumed to have failed. Here is my code:
import time
import json
import requests
import sys
from urlparse import parse_qs
from urlparse import urlparse
from urlparse import urlunparse
from urllib import urlencode
from multiprocessing import Process, Pool, Queue, current_process
from multiprocessing.pool import ThreadPool
from multiprocessing import TimeoutError
import traceback
from sets import Set
from massweb.pnk_net.pnk_request import pnk_request_raw
from massweb.targets.fuzzy_target import FuzzyTarget
from massweb.payloads.payload import Payload
class MassRequest(object):
def __init__(self, num_threads = 10, time_per_url = 10, request_timeout = 10, proxy_list = [{}]):
self.num_threads = num_threads
self.time_per_url = time_per_url
self.request_timeout = request_timeout
self.proxy_list = proxy_list
self.results = []
self.urls_finished = []
self.urls_attempted = []
self.targets_results = []
self.targets_finished = []
self.targets_attempted = []
def add_to_finished(self, x):
self.urls_finished.append(x[0])
self.results.append(x)
def add_to_finished_targets(self, x):
self.targets_finished.append(x[0])
self.targets_results.append(x)
def get_urls(self, urls):
timeout = float(self.time_per_url * len(urls))
pool = Pool(processes = self.num_threads)
proc_results = []
for url in urls:
self.urls_attempted.append(url)
proc_result = pool.apply_async(func = pnk_request_raw, args = (url, self.request_timeout, self.proxy_list), callback = self.add_to_finished)
proc_results.append(proc_result)
for pr in proc_results:
try:
pr.get(timeout = timeout)
except:
pool.terminate()
pool.join()
pool.terminate()
pool.join()
list_diff = Set(self.urls_attempted).difference(Set(self.urls_finished))
for url in list_diff:
sys.stderr.write("URL %s got timeout" % url)
self.results.append((url, "__PNK_GET_THREAD_TIMEOUT"))
if __name__ == "__main__":
f = open("out_urls_to_fuzz_1mil")
urls_to_request = []
for line in f:
url = line.strip()
urls_to_request.append(url)
mr = MassRequest()
mr.get_urls(urls_to_request)
Here is the function being called by the threads:
def pnk_request_raw(url_or_target, req_timeout = 5, proxy_list = [{}]):
if proxy_list[0]:
proxy = get_random_proxy(proxy_list)
else:
proxy = {}
try:
if isinstance(url_or_target, str):
sys.stderr.write("Requesting: %s with proxy %s\n" % (str(url_or_target), str(proxy)))
r = requests.get(url_or_target, proxies = proxy, timeout = req_timeout)
return (url_or_target, r.text)
if isinstance(url_or_target, FuzzyTarget):
sys.stderr.write("Requesting: %s with proxy %s\n" % (str(url_or_target), str(proxy)))
r = requests.get(url_or_target.url, proxies = proxy, timeout = req_timeout)
return (url_or_target, r.text)
except:
#use this to mark failure on exception
traceback.print_exc()
#edit: this is the line that was breaking it all
sys.stderr.out("A request failed to URL %s\n" % url_or_target)
return (url_or_target, "__PNK_REQ_FAILED")
This seems to work well for smaller sets of URLs, but here is the output:
Requesting: http://www.sportspix.co.za/ with proxy {}
Requesting: http://www.sportspool.co.za/ with proxy {}
Requesting: http://www.sportspredict.co.za/ with proxy {}
Requesting: http://www.sportspro.co.za/ with proxy {}
Requesting: http://www.sportsrun.co.za/ with proxy {}
Requesting: http://www.sportsstuff.co.za/ with proxy {}
Requesting: http://sportsstuff.co.za/2011-rugby-world-cup with proxy {}
Requesting: http://www.sportstar.co.za/4-stroke-racing with proxy {}
Requesting: http://www.sportstats.co.za/ with proxy {}
Requesting: http://www.sportsteam.co.za/ with proxy {}
Requesting: http://www.sportstec.co.za/ with proxy {}
Requesting: http://www.sportstours.co.za/ with proxy {}
Requesting: http://www.sportstrader.co.za/ with proxy {}
Requesting: http://www.sportstravel.co.za/ with proxy {}
Requesting: http://www.sportsturf.co.za/ with proxy {}
Requesting: http://reimo.sportsvans.co.za/ with proxy {}
Requesting: http://www.sportsvans.co.za/4x4andmoreWindhoek.html with proxy {}
Handled exception:Traceback (most recent call last):
File "mass_request.py", line 87, in get_fuzzy_targets
pr.get(timeout = timeout)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 528, in get
raise self._value
AttributeError: 'file' object has no attribute 'out'
On that last exception, the program hangs and I have to completely kill it. AFAIK I'm never trying to access a file object with the attribute "out". My question is... how to fix!? Am I doing something obviously wrong here? Why isn't there a clearer exception?
Upvotes: 1
Views: 761
Reputation: 94891
I think that sys.stderr.out("A request failed to URL %s\n" % url_or_target)
should be sys.stderr.write("A request failed to URL %s\n" % url_or_target)
Upvotes: 3