Reputation: 1794
I have a simple concurrency script setup like so:
import concurrent.futures
import json
import requests
import datetime
import sys
from datetime import datetime
from time import sleep
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
poll_worker = None
request_headers = {"Content-Type": "application/json"}
def addBulkListings(bulk_listings):
print("Adding listings")
future = executor.submit(runBulkListings, bulk_listings)
future.add_done_callback(addPollBulkID)
# Future callback
def addPollBulkID(future):
if not future.exception():
poll_id = future.result().json()['results']
print("Polling id: %s" % poll_id)
new_future = executor.submit(pollBulkListing, poll_id)
new_future.add_done_callback(callProcessMatches)
print(new_future.result())
else:
print("Error getting Poll ID")
print(future.exception())
# Future callback
def callProcessMatches(future):
print("callProcessMatches")
if not future.exception():
print("Processing matches")
result = future.result()
new_future = executor.submit(processMatches, result.json())
new_future.add_done_callback(finishBulkListing)
else:
print("Error polling")
print(future.exception())
# Future callback
def finishBulkListing(future):
if not future.exception():
print(future.result())
else:
print("Error processing matches")
print(future.exception())
# Executor called
def processMatches(response):
results = []
for product in response['results']:
processResults(product, results)
return results
# Executor called
def pollBulkListing(poll_id):
start = datetime.now()
overtime = False
while not overtime:
response = requests.get(MAIN_URL + poll_id,
headers = request_headers)
if response.status_code == requests.codes.ok:
return response
sleep(5)
overtime = (datetime.now() - start).seconds >= (1 * 60)
raise requests.exceptions.Timeout
# Executor called
def runBulkListings(bulk_listings):
response = requests.post(MAIN_URL,
data=json.dumps(bulk_listings),
headers = request_headers)
response.raise_for_status()
return response
"addBulkListing" is called by another script, which then begins working with the executor. I've had this work when I only call addBulkListing once, but if I call it twice things fail. Something will go wrong in the "addPollBulkID" method. The print statement there will be executed without an exception, but then the program just exits. Nothing in the "callProcessMatches" is called, with or without an exception. Like I said, when I only call addBulkListings once everything is fine.
Guesses: I've been fooling around with this for a while, but am not sure. In the examples I've seen people use:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
But that creates a context I don't want. I don't have all of my parameters to the "addBulkListings" function when I first begin, and need to be able to add them in without recreating the executor. Maybe I'm misunderstanding something.
Thanks for any help!
Upvotes: 0
Views: 1277
Reputation: 1794
Aha! So, I'll answer my own question incase anybody else needs it. Turns out I was able to fix it by switching to:
executor = concurrent.futures.ProcessPoolExecutor(max_workers=5)
Instead of the ThreadPoolExecutor. I'm thinking there was some memory overlap with the threads I was creating. This StackOverflow answer helped me a lot by pointing me in the right direction (Look at the 2nd answer).
Upvotes: 1