Jonas Castanheira
Jonas Castanheira

Reputation: 313

ThreadPoolExecutor and concurrent.futures hangs / delays adding completed futures to as_completed

I have what seems to be a tricky question / problem to solve in a Python application I have been assigned to maintain. I'm not a Python expert and haven't seen this problem clearly mentioned or talked about in other related threads, so I'm asking a new question.

The application in question does HTTP GET requests to several APIs and services from these APIs in a non-negligible scale (tens to hundreds of thousands of requests daily).

I am using Python's apscheduler (more precisely, BackgroundScheduler from apscheduler.schedulers.background) to run these requests in background jobs in a scheduled way. Sometimes a job involves doing 1_500 requests, sometimes in excess of 20_000 and sometimes in the vicinity of 150_000 requests.

Given the request volume, I'm also using ThreadPoolExecutor (given that tasks are I/O bound) to run these requests concurrently, and yield results (that might be important) to the function that calls the executor.

Below is a simplified snippet of the code used by the scheduled tasks to submit requests:

...

def submit_call(my_request_function, query_params_list, name):
  data = pandas.DataFrame() # creates DataFrame (similar to excel document)
  fails = []
  blanks = []
  count = 0
  start_time = datetime.datetime.now()
  ...

  for result in submit_concurrently(my_request_function, query_params_list):
    ...
    if isinstance(result, list):
      for i in result:
        data = data.append(i, ignore_index=True)
    else:
      data = data.append(result, ignore_index=True) # adds results to data

  return data

def submit_concurrently(fn, *iterables):
  ...

  executor = concurrent.futures.ThreadPoolExecutor()
  futures_list = []

  for iterable in iterables:
    futures_list += [executor.submit(fn, i) for i in iterable]

  for f in concurrent.futures.as_completed(futures_list):
    yield f.result()

Where my_request_function is a function that specifies which API and service I want to query, and query_params_list is a list comprised of several sets of query params, meaning that the same API and service will be queried several times with different params. Results from the requests are saved as a spreadsheet (among other stuff) after all requests have been made.

The problem: There's a gradual delay happening on the application somewhere between the call to executor.submit(fn, i) and concurrent.futures.as_completed(futures_list). I've traced the origin to this point after a lot of debugging and making sure that the delay is not on the GET request (the obvious prime suspect), before or after it. This delay is also gradual and only noticeable in larger jobs (with thousands of requests).

I have confirmed the following situation through logs and debugging: function fn called by executor.submit(fn, i) will return a value on a certain datetime but concurrent.futures will only mark that as completed half an hour later. Jobs start with no delay i.e. results are placed on the results spreadsheet in the same second they are returned from the function. After 1k requests, it takes more than 2 minutes from the moment a result is returned to the moment yield f.result() is called (i.e. execution flow gets to that line of code). After 15k requests, the delay is already on the 30-minute mark.

Does anyone have any insights as to:

I know it's a bigger read than usual for a StackOverflow question, so thanks in advance!

Upvotes: 0

Views: 266

Answers (0)

Related Questions