Reputation: 31013
Is there a way to re-send a piece of data for processing, if the original computation failed, using a simple pool?
import random
from multiprocessing import Pool
def f(x):
if random.getrandbits(1):
raise ValueError("Retry this computation")
return x*x
p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
Upvotes: 13
Views: 6330
Reputation: 8617
You can use a Queue
to feed back failures into the Pool
through a loop in the initiating Process
:
import multiprocessing as mp
import random
def f(x):
if random.getrandbits(1):
# on failure / exception catch
f.q.put(x)
return None
return x*x
def f_init(q):
f.q = q
def main(pending):
total_items = len(pending)
successful = []
failure_tracker = []
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, pending)
retry_results = []
while len(successful) < total_items:
successful.extend([r for r in results if not r is None])
successful.extend([r for r in retry_results if not r is None])
failed_items = []
while not q.empty():
failed_items.append(q.get())
if failed_items:
failure_tracker.append(failed_items)
retry_results = p.imap(f, failed_items);
p.close()
p.join()
print "Results: %s" % successful
print "Failures: %s" % failure_tracker
if __name__ == '__main__':
main(range(1, 10))
The output is like this:
Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]
A Pool
cant be shared between multiple processes. Hence this Queue
based approach. If you try to pass a pool as a parameter to the pools processes, you will get this error:
NotImplementedError: pool objects cannot be passed between processes or pickled
You could alternatively try a few immediate retries within your function f
, to avoid synchronisation overhead. It really is a matter of how soon your function should wait to retry, and on how likely a success is if retried immediately.
Old Answer: For the sake of completeness, here is my old answer, which isn't as optimal as resubmitting directly into the pool, but might still be relevant depending on the use case, because it provides a natural way to deal with/limit n
-level retries:
You can use a Queue
to aggregate failures and resubmit at the end of each run, over multiple runs:
import multiprocessing as mp
import random
def f(x):
if random.getrandbits(1):
# on failure / exception catch
f.q.put(x)
return None
return x*x
def f_init(q):
f.q = q
def main(pending):
run_number = 1
while pending:
jobs = pending
pending = []
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
p.join()
failed_items = []
while not q.empty():
failed_items.append(q.get())
successful = [r for r in results if not r is None]
print "(%d) Succeeded: %s" % (run_number, successful)
print "(%d) Failed: %s" % (run_number, failed_items)
print
pending = failed_items
run_number += 1
if __name__ == '__main__':
main(range(1, 10))
with output like this:
(1) Succeeded: [9, 16, 36, 81]
(1) Failed: [2, 1, 5, 7, 8]
(2) Succeeded: [64]
(2) Failed: [2, 1, 5, 7]
(3) Succeeded: [1, 25]
(3) Failed: [2, 7]
(4) Succeeded: [49]
(4) Failed: [2]
(5) Succeeded: [4]
(5) Failed: []
Upvotes: 10
Reputation: 19661
If you can (or don't mind) retrying immediately, use a decorator wrapping the function:
import random
from multiprocessing import Pool
from functools import wraps
def retry(f):
@wraps(f)
def wrapped(*args, **kwargs):
while True:
try:
return f(*args, **kwargs)
except ValueError:
pass
return wrapped
@retry
def f(x):
if random.getrandbits(1):
raise ValueError("Retry this computation")
return x*x
p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
Upvotes: 24