Reputation: 947
Is there anything I can do to the below code (I thought sessions would solve this?) to prevent new TCP connections being created with each GET request? I am hitting around 1000 requests a second and after around 10,000 request run out of sockets:
def ReqOsrm(url_input):
ul, qid = url_input
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=1)
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
# run:
pool = Pool(int(cpu_count()))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
Eric - thank you a lot for the response I think it's exactly what I need. However, I can't quite modify it correctly. The code correctly returns 10,000 responses for the first few cycles however then it seems to break and returns less than 10,000 which leads me to think I implemented the Queue incorrectly?
ghost = 'localhost'
gport = 8989
def CreateUrls(routes, ghost, gport):
return [
["http://{0}:{1}/route?point={2}%2C{3}&point={4}%2C{5}&vehicle=car&calc_points=false&instructions=false".format(
ghost, gport, alat, alon, blat, blon),
qid] for qid, alat, alon, blat, blon in routes]
def LoadRouteCSV(csv_loc):
if not os.path.isfile(csv_loc):
raise Exception("Could not find CSV with addresses at: %s" % csv_loc)
else:
return pd.read_csv(csv_loc, sep=',', header=None, iterator=True, chunksize=1000 * 10)
class Worker(Process):
def __init__(self, qin, qout, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self._qin = qin
self._qout = qout
def run(self):
# Create threadsafe connection pool
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=10)
class Consumer(threading.Thread):
def __init__(self, qin, qout):
threading.Thread.__init__(self)
self.__qin = qin
self.__qout = qout
def run(self):
while True:
msg = self.__qin.get()
ul, qid = msg
try:
response = conn_pool.request('GET', ul)
s = float(response.status)
if s == 200:
json_geocode = json.loads(response.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [qid, s, tot_time_s, tot_dist_m]
elif s == 400:
print("Done but no route for row: ", qid)
out = [qid, 999, 0, 0]
else:
print("Done but unknown error for: ", s)
out = [qid, 999, 0, 0]
except Exception as err:
print(err)
out = [qid, 999, 0, 0]
self.__qout.put(out)
self.__qin.task_done()
num_threads = 10
[Consumer(self._qin, self._qout).start() for _ in range(num_threads)]
if __name__ == '__main__':
try:
with open(os.path.join(directory_loc, 'gh_output.csv'), 'w') as outfile:
wr = csv.writer(outfile, delimiter=',', lineterminator='\n')
for x in LoadRouteCSV(csv_loc=os.path.join(directory_loc, 'gh_input.csv')):
routes = x.values.tolist()
url_routes = CreateUrls(routes, ghost, gport)
del routes
stime = time.time()
qout = Queue()
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
[Worker(qin, qout).start() for _ in range(cpu_count())]
# Block until all urls in qin are processed
qin.join()
calc_routes = []
while not qout.empty():
calc_routes.append(qout.get())
# Time diagnostics
dur = time.time() - stime
print("Calculated %d distances in %.2f seconds: %.0f per second" % (len(calc_routes),
dur,
len(calc_routes) / dur))
del url_routes
wr.writerows(calc_routes)
done_count += len(calc_routes)
# Continually update progress in terms of millions
print("Saved %d calculations" % done_count)
Upvotes: 1
Views: 2011
Reputation: 947
Appreciate the help - my working solution:
class Worker(Process):
def __init__(self, qin, qout, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self._qin = qin
self._qout = qout
def run(self):
# Create threads to run in process
class Consumer(threading.Thread):
def __init__(self, qin, qout):
threading.Thread.__init__(self)
self.__qin = qin
self.__qout = qout
def run(self):
# Close once queue empty (otherwise process will linger)
while not self.__qin.empty():
msg = self.__qin.get()
ul, qid = msg
try:
response = conn_pool.request('GET', ul)
s = float(response.status)
if s == 200:
json_geocode = json.loads(response.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [qid, s, tot_time_s, tot_dist_m]
elif s == 400:
#print("Done but no route for row: ", qid)
out = [qid, 999, 0, 0]
else:
print("Done but unknown error for: ", s)
out = [qid, 999, 0, 0]
except Exception as err:
print(err)
out = [qid, 999, 0, 0]
#print(out)
self.__qout.put(out)
self.__qin.task_done()
# Create thread-safe connection pool
concurrent = 10
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
num_threads = concurrent
# Start threads (concurrent) per process
[Consumer(self._qin, self._qout).start() for _ in range(num_threads)]
# Block until all urls in self._qin are processed
self._qin.join()
return
if __name__ == '__main__':
# Fill queue input
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
# Queue to collect output
qout = Queue()
# Start cpu_count number of processes (which will launch threads and sessions)
workers = []
for _ in range(cpu_count()):
workers.append(Worker(qin, qout))
workers[-1].start()
# Block until all urls in qin are processed
qin.join()
# Fill routes
calc_routes = []
while not qout.empty():
calc_routes.append(qout.get())
del qin, qout
Upvotes: 0
Reputation: 10752
I was thinking something more like this. The idea is to spawn a process per core and a pool of threads per process. Each process has a separate connection pool which are shared among the threads in that process. I don't think you can get a more performant solution without some kind of threading.
from multiprocessing import Pool, cpu_count
import Queue
from urllib3 import HTTPConnectionPool
import threading
def ReqOsrm(url_input):
# Create threadsafe connection pool
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=1000)
# Create consumer thread class
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
msg = self._queue.get()
try:
response = conn_pool.request('GET', url)
print response
except Exception as err:
print err
self._queue.task_done()
# Create work queue and a pool of workers
queue = Queue.Queue()
num_threads = 20
workers = []
for _ in xrange(num_threads):
worker = Consumer(queue)
worker.start()
workers.append(worker)
for url in url_input:
queue.put(url)
queue.join()
url_routes = [
["/proc1-0", "/proc1-1"],
["/proc2-0", "/proc2-1"],
["/proc3-0", "/proc3-1"],
["/proc4-0", "/proc4-1"],
["/proc5-0", "/proc5-1"],
["/proc6-0", "/proc6-1"],
["/proc7-0", "/proc7-1"],
["/proc8-0", "/proc8-1"],
["/proc9-0", "/proc9-1"],
]
pool = Pool(int(cpu_count()))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
Upvotes: 1