Reputation: 4512
I'm trying this example code for python's ThreadPoolExecutor
from python's concurrent.futures
documentation. Specifically
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
How can I re-send a call to the executor if an exception is generated inside the concurrent.futures.as_completed(future_to_url)
loop? Basically I simply want to retry failed jobs.
I've tried simply calling something like executor.submit(...)
but it doesn't seem to work.
Any suggestion is welcome, thanks
Upvotes: 4
Views: 3112
Reputation: 2776
The Futures are realized after the failure. The failure is their value. Therefore, you cannot re-execute them, you must create a new Future.
Since you have the URL, you could do it on the error handler and, after the Futures in future_to_url
completes, iterate over the new Futures which were added to a different collection.
Upvotes: 2
Reputation: 23583
I had the same question recently and like @Javier says I ended up needing fresh futures each time. And I used a while loop like @Javier hinted at as well.
Reframing with respect to your url based problem setup, I wanted to make it easier to simulate timeouts/failure so below if "fake" is in the url then I simulate the behavior.
import concurrent.futures
import time
import urllib
from random import randint
from collections import defaultdict
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
URLS = [f"http://fake{i}.com" for i in range(20)]
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
if "fake" in url:
time.sleep(1)
x = randint(1, 10)
if x <= 5:
return {"timeout": True, "error": "SimulatedTimeout", "url": url}
elif x in [6, 7]:
return {"error": "SomeOtherError", "url": url}
else:
return {"data": "<html>" + str(randint(1, 999999)) + "</html>", "url": url}
try:
with urllib.request.urlopen(url, timeout=timeout) as conn:
data = conn.read()
return {"data": data, "url": url}
# except urllib.error.URLError as e:
except Exception as e:
if "TimeoutError" in repr(e):
return {"timeout": True, "error": repr(e), "url": url}
else:
return {"error": repr(e), "url": url}
todo = [{"url": url} for url in URLS]
final_results = []
retry_counts = defaultdict(int)
max_retries = 5
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
while True:
future_list = [executor.submit(load_url, item["url"], 60) for item in todo]
todo = []
for future in concurrent.futures.as_completed(future_list):
result = future.result()
if result.get("data"):
final_results.append({**result, "retries": retry_counts[result["url"]]})
elif result.get("error") and not result.get("timeout"):
final_results.append({**result, "retries": retry_counts[result["url"]]})
elif result.get("timeout") and retry_counts[result["url"]] < max_retries:
retry_counts[result["url"]] += 1
todo.append({"url": result["url"]})
else:
final_results.append({**result, "reached_max_retries": True, "retries": retry_counts[result["url"]]})
if len(final_results) == len(URLS):
print("Done!")
break
else:
print(f"we are now {len(final_results)} out of {len(URLS)}")
that produced the output,
we are now 10 out of 20
we are now 11 out of 20
we are now 16 out of 20
we are now 17 out of 20
we are now 18 out of 20
Done!
In [45]: pd.DataFrame.from_records(final_results)
Out[45]:
error url retries data timeout reached_max_retries
0 SomeOtherError http://fake0.com 0 NaN NaN NaN
1 NaN http://fake2.com 0 <html>124983</html> NaN NaN
2 SomeOtherError http://fake3.com 0 NaN NaN NaN
3 NaN http://fake7.com 0 <html>459880</html> NaN NaN
4 SomeOtherError http://fake10.com 0 NaN NaN NaN
5 NaN http://fake13.com 0 <html>598498</html> NaN NaN
6 NaN http://fake15.com 0 <html>477976</html> NaN NaN
7 NaN http://fake16.com 0 <html>748633</html> NaN NaN
8 SomeOtherError http://fake17.com 0 NaN NaN NaN
9 NaN http://fake19.com 0 <html>104853</html> NaN NaN
10 NaN http://fake9.com 1 <html>677035</html> NaN NaN
11 NaN http://fake8.com 2 <html>249557</html> NaN NaN
12 NaN http://fake5.com 2 <html>516063</html> NaN NaN
13 SomeOtherError http://fake6.com 2 NaN NaN NaN
14 SomeOtherError http://fake11.com 2 NaN NaN NaN
15 NaN http://fake12.com 2 <html>66441</html> NaN NaN
16 NaN http://fake1.com 3 <html>604868</html> NaN NaN
17 SomeOtherError http://fake18.com 4 NaN NaN NaN
18 SimulatedTimeout http://fake4.com 5 NaN True True
19 SimulatedTimeout http://fake14.com 5 NaN True True
Upvotes: 1