mptevsion
mptevsion

Reputation: 947

Python Requests - ephemeral port exhaustion

Is there anything I can do to the below code (I thought sessions would solve this?) to prevent new TCP connections being created with each GET request? I am hitting around 1000 requests a second and after around 10,000 request run out of sockets:

def ReqOsrm(url_input):
    ul, qid = url_input
    conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=1)
    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out
        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

# run:
pool = Pool(int(cpu_count()))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))


Eric - thank you a lot for the response I think it's exactly what I need. However, I can't quite modify it correctly. The code correctly returns 10,000 responses for the first few cycles however then it seems to break and returns less than 10,000 which leads me to think I implemented the Queue incorrectly?

enter image description here

ghost = 'localhost'
gport = 8989

def CreateUrls(routes, ghost, gport):
    return [
        ["http://{0}:{1}/route?point={2}%2C{3}&point={4}%2C{5}&vehicle=car&calc_points=false&instructions=false".format(
            ghost, gport, alat, alon, blat, blon),
            qid] for qid, alat, alon, blat, blon in routes]


def LoadRouteCSV(csv_loc):
    if not os.path.isfile(csv_loc):
        raise Exception("Could not find CSV with addresses at: %s" % csv_loc)
    else:
        return pd.read_csv(csv_loc, sep=',', header=None, iterator=True, chunksize=1000 * 10)

class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self._qin = qin
        self._qout = qout

    def run(self):
        # Create threadsafe connection pool
        conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=10)

        class Consumer(threading.Thread):
            def __init__(self, qin, qout):
                threading.Thread.__init__(self)
                self.__qin = qin
                self.__qout = qout

            def run(self):
                while True:
                    msg = self.__qin.get()
                    ul, qid = msg
                    try:
                        response = conn_pool.request('GET', ul)
                        s = float(response.status)
                        if s == 200:
                            json_geocode = json.loads(response.data.decode('utf-8'))
                            tot_time_s = json_geocode['paths'][0]['time']
                            tot_dist_m = json_geocode['paths'][0]['distance']
                            out = [qid, s, tot_time_s, tot_dist_m]
                        elif s == 400:
                            print("Done but no route for row: ", qid)
                            out = [qid, 999, 0, 0]
                        else:
                            print("Done but unknown error for: ", s)
                            out = [qid, 999, 0, 0]
                    except Exception as err:
                        print(err)
                        out = [qid, 999, 0, 0]
                    self.__qout.put(out)
                    self.__qin.task_done()

        num_threads = 10
        [Consumer(self._qin, self._qout).start() for _ in range(num_threads)]

if __name__ == '__main__':
    try:
        with open(os.path.join(directory_loc, 'gh_output.csv'), 'w') as outfile:
            wr = csv.writer(outfile, delimiter=',', lineterminator='\n')
            for x in LoadRouteCSV(csv_loc=os.path.join(directory_loc, 'gh_input.csv')):
                routes = x.values.tolist()
                url_routes = CreateUrls(routes, ghost, gport)
                del routes

                stime = time.time()

                qout = Queue()
                qin = JoinableQueue()
                [qin.put(url_q) for url_q in url_routes]
                [Worker(qin, qout).start() for _ in range(cpu_count())]
                # Block until all urls in qin are processed
                qin.join()
                calc_routes = []
                while not qout.empty():
                    calc_routes.append(qout.get())

                # Time diagnostics
                dur = time.time() - stime
                print("Calculated %d distances in %.2f seconds: %.0f per second" % (len(calc_routes),
                                                                                    dur,
                                                                                    len(calc_routes) / dur))
                del url_routes
                wr.writerows(calc_routes)
                done_count += len(calc_routes)
                # Continually update progress in terms of millions
                print("Saved %d calculations" % done_count)

Upvotes: 1

Views: 2011

Answers (2)

mptevsion
mptevsion

Reputation: 947

Appreciate the help - my working solution:

class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self._qin = qin
        self._qout = qout

    def run(self):
        # Create threads to run in process
        class Consumer(threading.Thread):
            def __init__(self, qin, qout):
                threading.Thread.__init__(self)
                self.__qin = qin
                self.__qout = qout

            def run(self):
                # Close once queue empty (otherwise process will linger)
                while not self.__qin.empty():
                    msg = self.__qin.get()
                    ul, qid = msg
                    try:
                        response = conn_pool.request('GET', ul)
                        s = float(response.status)
                        if s == 200:
                            json_geocode = json.loads(response.data.decode('utf-8'))
                            tot_time_s = json_geocode['paths'][0]['time']
                            tot_dist_m = json_geocode['paths'][0]['distance']
                            out = [qid, s, tot_time_s, tot_dist_m]
                        elif s == 400:
                            #print("Done but no route for row: ", qid)
                            out = [qid, 999, 0, 0]
                        else:
                            print("Done but unknown error for: ", s)
                            out = [qid, 999, 0, 0]
                    except Exception as err:
                        print(err)
                        out = [qid, 999, 0, 0]
                    #print(out)
                    self.__qout.put(out)
                    self.__qin.task_done()

        # Create thread-safe connection pool
        concurrent = 10
        conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
        num_threads = concurrent
        # Start threads (concurrent) per process
        [Consumer(self._qin, self._qout).start() for _ in range(num_threads)]
        # Block until all urls in self._qin are processed
        self._qin.join()
        return

if __name__ == '__main__':
   # Fill queue input
   qin = JoinableQueue()
   [qin.put(url_q) for url_q in url_routes]
   # Queue to collect output
   qout = Queue()
   # Start cpu_count number of processes (which will launch threads and sessions)
   workers = []
   for _ in range(cpu_count()):
       workers.append(Worker(qin, qout))
       workers[-1].start()
   # Block until all urls in qin are processed
   qin.join()
   # Fill routes
   calc_routes = []
   while not qout.empty():
       calc_routes.append(qout.get())
   del qin, qout

Upvotes: 0

Eric Conner
Eric Conner

Reputation: 10752

I was thinking something more like this. The idea is to spawn a process per core and a pool of threads per process. Each process has a separate connection pool which are shared among the threads in that process. I don't think you can get a more performant solution without some kind of threading.

from multiprocessing import Pool, cpu_count
import Queue

from urllib3 import HTTPConnectionPool
import threading


def ReqOsrm(url_input):
    # Create threadsafe connection pool
    conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=1000)

    # Create consumer thread class
    class Consumer(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self._queue = queue

        def run(self):
          while True:
              msg = self._queue.get()
              try:
                response = conn_pool.request('GET', url)
                print response
              except Exception as err:
                print err
              self._queue.task_done()

    # Create work queue and a pool of workers
    queue = Queue.Queue()
    num_threads = 20
    workers = []
    for _ in xrange(num_threads):
        worker = Consumer(queue)
        worker.start()
        workers.append(worker)

    for url in url_input:
        queue.put(url)

    queue.join()

url_routes = [
    ["/proc1-0", "/proc1-1"],
    ["/proc2-0", "/proc2-1"],
    ["/proc3-0", "/proc3-1"],
    ["/proc4-0", "/proc4-1"],
    ["/proc5-0", "/proc5-1"],
    ["/proc6-0", "/proc6-1"],
    ["/proc7-0", "/proc7-1"],
    ["/proc8-0", "/proc8-1"],
    ["/proc9-0", "/proc9-1"],
]

pool = Pool(int(cpu_count()))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

Upvotes: 1

Related Questions