user3693559
user3693559

Reputation:

`multiprocessing` `starmap_async` only calls callback once?

I have the following code which creates a pool for 4 workers, and calls a worker method. the code works fine for the most part. when running I see that different workers are being called to process the work. However calc_completed is never called once at the very end when all workers are complete. is this expected behaviour? I would have expected the callback to happen when each worker is completed.

def calculate_worker(x, y):
    print 'working...'
    ...

def calc_completed(result):
    print 'completed: %s'%str(result)

def calc_errored(result):
    print 'error: %s'%str(result)

if __name__ == '__main__':  
    start, stop, step = 1, 1000, 1
    ranges = [(n, min(n+step, stop)) for n in xrange(start, stop, step)]

    pool = mp.Pool(processes=8)

    res = pool.starmap_async(calculate_worker, ranges,
                             callback=calculate_worker, error_callback=calc_completed)  

    pool.close()
    pool.join()
    d = res.get()       
    print(d)

Upvotes: 2

Views: 3143

Answers (1)

sophros
sophros

Reputation: 16660

calc_completed is would only be called should there was any error encountered in the execution of the the mapped function (here: calculate_worker).

Another issue in your code is that you both running calculate_worker function in parallel and using it as a callback. This does not make much sense as calculate_worker will be called twice - first: as a worker function and secondly: as a function to report that the calculation have finished. You should have two different function there.

Given the functions in the snippet you provided I would change it the following way:

res = pool.starmap_async(calculate_worker, ranges,
                         callback=calc_completed, 
                         error_callback=calc_errored)  

If you want to test if calc_errored is called appropriately then you can introduce some random errors in the calculate_worker function to see if it is going to be handled, e.g.

def calculate_worker(x, y):
    if (x % 7):
      x / (y - y)  # division by zero
    print 'working...'

Upvotes: 2

Related Questions