DrSAR
DrSAR

Reputation: 1552

Processing results from asyncmap as they come in

I am trying to process data in parallel using ipython's parallel processing. I am following the instructions by @minrk in answer to the question on how to get intermidiate results in ipython parallel processing?. Since the data is heterogeneous some of the processing tasks are finished sooner than others and I would like to save them as soon as they become available. I do this in the following fashion:

from IPython.parallel import Client

def specialfunc(param):
    import time
    if param > 8:
        raise IOError
    else:
        time.sleep( param)
        return param

client = Client()
balanced       = client.load_balanced_view()
balanced.block = False
param_list = range(10)   # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)

I can then loop over asyncmap and results become available when they are ready:

for i in asyncmap:
    print i

The trouble is that my code sometimes throws exceptions (the example above forces an IOError when the calling parameter exceeds 8) which I would like to deal with. However, as soon as one of the engines throws a wobbly, the whole asyncmap 'appears' to be finished.

I actually noticed that when I interrogate asyncmap.metadata can very well figure out which message gave an error (asyncmap.metadata[i]['pyerr']) but then I don't know how to wait for the results to come in as they do.

So my question is how should I process my results arriving asynchronously from my engines even if they do sometimes throw exceptions. How do I catch the exceptions in the engines without upsetting the waiting for results in the controller?

Upvotes: 1

Views: 416

Answers (2)

Alex S
Alex S

Reputation: 1065

I know it sounds sort of stupid, but you could return a special value to indicate an error, say -1 or None or a string. To get around the map_async what I have done is to loop through the parameters and using apply_async, storing the result in a list. Then, I loop through the list trying to get the results and process them one at a time. Looks something like this:

 n_cores = len(c.ids)
 for n,p in enumerate( params ):
     core = c.ids[n%n_cores]
     calls.append( c[core].apply_async( f, p ) )

  #then you get the results

 while calls != []:
      for c in calls:
          try:
               result = c.get(1e-3)
               process(result)
               calls.remove( c )
               #in the case your call failed, you can apply_async again.
               # and append the call to calls.
          except parallel.TimeoutError:
               pass

Or alternatively use c[core].apply() and check the calls with c.ready(). Basically the same thing without exception handling. Annoying thing is this takes up a lot of memory as the results and other dict's of every call are hard to clear.

I was doing a similar thing here and I decided map_async just didn't work for me. This might be relevant too, in case you decide to go for this approach.

Cheers.

PS: i think essentially this is what you implemented above, but I find it more natural to deal with the calls separately then stacking them into the map, specially if you might want to reprocess some of them later on.

Upvotes: 1

DrSAR
DrSAR

Reputation: 1552

Inspired by ipython/*/examples/parallel/customresults.py, I came up with this solution:

asyncmap = balanced.map(specialfunc, param_list, ordered=False)

#create original mapping of msg_ids to parameters
# maybe just a quick way to find which parameter gave what result
msg_ids_to_parameters = dict(zip(asyncmap.msg_ids, param_list))

pending = set(asyncmap.msg_ids) # all queued jobs are pending
while pending:   # we'll come back as long as finished jobs haven't been looked at yet
    try:
        client.wait(pending, 1e-3)
    except parallel.TimeoutError:
        # ignore timeouterrors, since they only mean that at least one isn't done
        pass

    # finished is the set of msg_ids that are complete
    finished = pending.difference(client.outstanding)
    # update pending to exclude those that just finished
    pending = pending.difference(finished)
    for msg_id in finished:
        # we know these are done, so don't worry about blocking
        ar = client.get_result(msg_id)
        # checking whether any exceptions occurred when code ran on the engine
        if ar.metadata['pyerr'] is None:
            print "job id %s finished on engine %i " % (msg_id, ar.engine_id)
            print "and results for parameter %i :" % msg_ids_to_parameters[msg_id]
            # note that each job in a map always returns a list of length chunksize
            # even if chunksize == 1
            for res in ar.result:
                print " item %i \n" % res
        else:
            print('this went wrong for %i (%s)' % (msg_ids_to_parameters[msg_id], ar.metadata['pyerr']))

Essentially the change from the example code was to look at the metadata and see whether an error has been recorded and only if not to go ahead and retrieve the result through ar.result.

Upvotes: 0

Related Questions