Reputation: 50
I'm trying to write an application which works through a list of database entries, making an API call with those, return the value and if one value of the APIs JSON response is True
for 5 calls, I want to have the list of those 5 calls. As the database entries are a couple of thousand entries, I want to realise this with multiprocessing
. But I'm a beginner with parallelisation and it seems I can't get the grasp of how it works and how to set the exit condition. Here's what I got:
from multiprocessing.dummy import Pool
import requests
def get_api_response(apikey, result, subscription_id):
r = requests.get("https://api.example.com/" + subscription_id)
if r.json()['subscribed'] == True:
result.append(r.json())
return result
def pass_args(args):
foo = get_api_response(*args)
if foo:
return foo
def check_response_amount(result):
if len(result) >= 5:
pool.terminate()
# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(5)
pool_result = pool.map_async(pass_args, request_tuples, callback=check_response_amount)
pool_result.wait()
pool.close()
pool.join()
The application checks every database entry and returns every api response which has subscribed == True
without even running through the callback. I tried applying the answer from another question (Python Multiprocessing help exit on condition), but couldn't get it to work. Can somebody help me?
Upvotes: 1
Views: 1324
Reputation: 94881
When you use map_async
, the callback won't be executed until every work item in the iterable has completed. If you want the callback to execute for every item in request_tuples
, rather than only after all of them are done, you need to use apply_async
inside a for loop instead:
results = []
for item in request_tuples:
results.append(pool.apply_async(get_api_response, args=item, callback=check_response_amount))
for result in results:
result.wait()
Additionally, calling pool.terminate
isn't going to work the way you want; the items you've already submitted to the pool are going to hang forever once you call it, which will make your script hang, since you're waiting on them to finish before exiting. You can work around this by just waiting on the pool to join, rather than actually waiting on any individual task to finish.
import time
from multiprocessing.dummy import Pool
from multiprocessing.pool import TERMINATE
def get_api_response(apikey, result, subscription_id):
url = ("https://api.example.com/" + str(subscription_id))
time.sleep(2)
result.append(url)
return result
def pass_args(args):
foo = get_api_response(*args)
if foo:
return foo
def check_response_amount(result):
if result and len(result) >= 5:
print("DONE %s" % result)
pool.terminate()
def get_db_entries():
return [{'subscription_id' : i} for i in range(100)]
# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(2)
results = []
for item in request_tuples:
results.append(pool.apply_async(get_api_response, item, callback=check_response_amount))
pool.close()
pool.join()
print("done")
Output:
IN HERE
IN HERE
IN HERE
IN HERE
IN HERE
... (a bunch more of this)...
IN HERE
IN HERE
DONE ['https://api.example.com/1', 'https://api.example.com/0', 'https://api.example.com/2', 'https://api.example.com/3', 'https://api.example.com/4', 'https://api.example.com/5']
done
Note that the result
list can end up being a little bigger than you want, since the terminate
call won't actually stop in-progress tasks.
Upvotes: 1