Reputation: 1552
I am trying to process data in parallel using ipython's parallel processing. I am following the instructions by @minrk in answer to the question on how to get intermidiate results in ipython parallel processing?. Since the data is heterogeneous some of the processing tasks are finished sooner than others and I would like to save them as soon as they become available. I do this in the following fashion:
from IPython.parallel import Client
def specialfunc(param):
import time
if param > 8:
raise IOError
else:
time.sleep( param)
return param
client = Client()
balanced = client.load_balanced_view()
balanced.block = False
param_list = range(10) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)
I can then loop over asyncmap and results become available when they are ready:
for i in asyncmap:
print i
The trouble is that my code sometimes throws exceptions (the example above forces an IOError when the calling parameter exceeds 8) which I would like to deal with. However, as soon as one of the engines throws a wobbly, the whole asyncmap 'appears' to be finished.
I actually noticed that when I interrogate asyncmap.metadata can very well figure out which message gave an error (asyncmap.metadata[i]['pyerr']) but then I don't know how to wait for the results to come in as they do.
So my question is how should I process my results arriving asynchronously from my engines even if they do sometimes throw exceptions. How do I catch the exceptions in the engines without upsetting the waiting for results in the controller?
Upvotes: 1
Views: 416
Reputation: 1065
I know it sounds sort of stupid, but you could return a special value to indicate an error, say -1
or None
or a string. To get around the map_async
what I have done is to loop through the parameters and using apply_async
, storing the result in a list. Then, I loop through the list trying to get the results and process them one at a time. Looks something like this:
n_cores = len(c.ids)
for n,p in enumerate( params ):
core = c.ids[n%n_cores]
calls.append( c[core].apply_async( f, p ) )
#then you get the results
while calls != []:
for c in calls:
try:
result = c.get(1e-3)
process(result)
calls.remove( c )
#in the case your call failed, you can apply_async again.
# and append the call to calls.
except parallel.TimeoutError:
pass
Or alternatively use c[core].apply()
and check the calls with c.ready()
. Basically the same thing without exception handling. Annoying thing is this takes up a lot of memory as the results
and other dict
's of every call are hard to clear.
I was doing a similar thing here and I decided map_async just didn't work for me. This might be relevant too, in case you decide to go for this approach.
Cheers.
PS: i think essentially this is what you implemented above, but I find it more natural to deal with the calls separately then stacking them into the map, specially if you might want to reprocess some of them later on.
Upvotes: 1
Reputation: 1552
Inspired by ipython/*/examples/parallel/customresults.py, I came up with this solution:
asyncmap = balanced.map(specialfunc, param_list, ordered=False)
#create original mapping of msg_ids to parameters
# maybe just a quick way to find which parameter gave what result
msg_ids_to_parameters = dict(zip(asyncmap.msg_ids, param_list))
pending = set(asyncmap.msg_ids) # all queued jobs are pending
while pending: # we'll come back as long as finished jobs haven't been looked at yet
try:
client.wait(pending, 1e-3)
except parallel.TimeoutError:
# ignore timeouterrors, since they only mean that at least one isn't done
pass
# finished is the set of msg_ids that are complete
finished = pending.difference(client.outstanding)
# update pending to exclude those that just finished
pending = pending.difference(finished)
for msg_id in finished:
# we know these are done, so don't worry about blocking
ar = client.get_result(msg_id)
# checking whether any exceptions occurred when code ran on the engine
if ar.metadata['pyerr'] is None:
print "job id %s finished on engine %i " % (msg_id, ar.engine_id)
print "and results for parameter %i :" % msg_ids_to_parameters[msg_id]
# note that each job in a map always returns a list of length chunksize
# even if chunksize == 1
for res in ar.result:
print " item %i \n" % res
else:
print('this went wrong for %i (%s)' % (msg_ids_to_parameters[msg_id], ar.metadata['pyerr']))
Essentially the change from the example code was to look at the metadata and see whether an error has been recorded and only if not to go ahead and retrieve the result through ar.result
.
Upvotes: 0