Peter Toft
Peter Toft

Reputation: 707

Python parallel execution - how to debug efficiently?

Below is a Python problem, that demonstrates how to iterate a function func in parallel using multiprocessing.Pool. The are Np number of elements to iterate. The function func merely returns Np minus the index of the iterable. As seen I use a queue to return the values from the function, when running in parallel mode.

If I set runParallel=False the program can be executed in serial mode.

The program runs fine, for runParallel=False and runParallel=True, but now comes the essential problem I have: As you might see below, if setting problemIndex a bit lower than Np (e.g. problemIndex=7), then I make a floating point exception. I divide by zero - stupid me :-)

If running runParallel=False then I can see the source line number of the bug and I catch the bug directly.

$ python 
Traceback (most recent call last):
  File "", line 63, in <module>
    a = func(argList[p])
  File "", line 22, in func
    ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero


However for runParallel=True I just end up in the "Bummer" print-section with no indication of the source of the bug. Annoying!

My question is: For runParallel=True, how can I efficiently debug this and get the line number of the buggy code-line back from the Pool()?

import time
import multiprocessing
import sys
import random

# Toggle whether we run parallel or not
runParallel = True

# Problematic index - if less than Np we create an exception
problemIndex = 13

# Number of compute problems
Np = 10

def func(args):
    # Emulate that the function might be fast or slow
    ret  = args["Np"] - args["index"]
    # Emulate a bug 
    if args["index"]==args["problemIndex"]:
        ret = 1/(args["index"]-args["problemIndex"])
    # Return data
    if args["runParallel"]:
        # We use a queue thus ordering may not be protected
        return ret

# Return queue used when running parallel
manager = multiprocessing.Manager()
q = manager.Queue()

# Build argument lists
argList = []
for i in range(Np):
    args["index"] = i # index
    args["Np"] = Np   # Number of problems
    args["q"] = q     # return queue for parallel execution mode
    args["problemIndex"] = problemIndex  # if index == problemIndex then func will malfunction
    args["runParallel"] = runParallel    # should we run parallel

#should we run parallel
if runParallel:
    # Run 10 processes in parallel
    p = multiprocessing.Pool(processes=10)
    ret = p.map_async(func, argList)
    qLen = q.qsize()
    if not qLen == Np:
        print "Bummer - one of more worker threads broke down",Np,qLen

resultVector = [None]*Np
for p in range(Np):
    if runParallel:
        (i,a) = q.get(timeout=0.1)
        i = p
        a = func(argList[p])
    resultVector[i] = a

for i in range(Np):
    print "Index", i, "gives",resultVector[i]

Upvotes: 6

Views: 3730

Answers (3)

Peter Toft
Peter Toft

Reputation: 707

John Greenall has given the best solution and the bounty has been paid.

The reason is that his solution does not make a try/except in the central parts of the code i.e. the whole "func" as radu.ciorba showed us. However this other way is also doable.

Since Johns solution is not made 100% in the line of my question I will post a solution to my own code, where I have applied Johns solution. Again credit to John but also to Radu!

# solution
import time
import multiprocessing
import sys
import random
import logging
import traceback

# Toggle whether we run parallel or not
runParallel = True

# Problematic index - if less than Np we create an exception
problemIndex = 14

# Number of compute problems
Np = 10

def func(args):
    # Emulate that the function might be fast or slow
    ret  = args["Np"] - args["index"]

    # Emulate a bug 
    if args["index"]==args["problemIndex"]:
        ret = 1/(args["index"]-args["problemIndex"])

    # Return data
    return (args["index"],ret)

def mpFunctionReportError(args):
    q = args["q"]
    rslt = {"index":args["index"],
        rslt["result"] = func(args)
    except Exception as e:
        rslt["result"] = None
        rslt["error"] = e
        rslt["args"] = str(args)
        rslt["traceback"] = traceback.format_exc(e)

# Return queue used when running parallel
manager = multiprocessing.Manager()
q = manager.Queue()

# Build argument lists
argList = []
for i in range(Np):
    args["index"] = i # index
    args["Np"] = Np   # Number of problems
    args["q"] = q     # return queue for parallel execution mode
    args["problemIndex"] = problemIndex  # if index == problemIndex then func will malfunction
    args["runParallel"] = runParallel    # should we run parallel

resultVector = [None]*Np

#should we run parallel
if runParallel:
    # Run 10 processes in parallel
    p = multiprocessing.Pool(processes=10)
    ret = p.map_async(mpFunctionReportError, argList)
    # Wait until error or done
    # Queue size
    qLen = q.qsize()
    # List for the errors
    bugList = {}
    # Loop the queue
    for i in range(qLen):
        # Pop a value
        returnVal = q.get()
        # Check for the error code
        if returnVal["error"] is not None:
            bugList[returnVal["index"]] = returnVal
            resultVector[returnVal["index"]] = returnVal["result"]

    # Print the list of errors
    if bugList:        
        print "-"*70
        print "Some parts of the parallel execution broke down. Error list:"
        print "-"*70
        for i in bugList:
            print "Index :",bugList[i]["index"]
            print "Error code :",bugList[i]["error"]
            print "Traceback :",bugList[i]["traceback"]
            print "Args :",bugList[i]["args"]
            print "-"*70
    for p in range(Np):
        resultVector[i] = func(argList[p])

for i in range(Np):
    print "Index", i, "gives",resultVector[i]

When it break for "runParallel = True" and "problemIndex = 4" we have full trace information now

Some parts of the parallel execution broke down. Error list:
Index : 4
Error code : integer division or modulo by zero
Traceback : Traceback (most recent call last):
  File "", line 44, in mpFunctionReportError
    rslt["result"] = func(args)
  File "", line 26, in func
    ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero

Args : {'Np': 10, 'index': 4, 'problemIndex': 4, 'q': <AutoProxy[Queue] object, typeid 'Queue' at 0xb708710c>, 'runParallel': True}

Upvotes: 0

John Greenall
John Greenall

Reputation: 1690

I have found the traceback module to be very useful in multiprocessing debug. If you pass an exception back to the main thread/process you'll lose all the traceback info so you need to call traceback.format_exc within the child thread and pass that text back to main thread with the exception. Below I'm including a pattern that can be used with Pool.

import traceback
import multiprocessing as mp
import time

def mpFunctionReportError(kwargs):
    wrap any function and catch any errors from f, 
    putting them in pipe instead of raising
    kwargs must contain 'queue' (multiprocessing queue) 
                    and 'f' function to be run
    queue = kwargs.pop('queue')
    f = kwargs.pop('f')
        rslt = f(**kwargs)
    except Exception, e:

def doNothing(a):
    return a

def raiseException(a):
    raise ValueError('this is bad')

manager = mp.Manager()
outQ = manager.Queue()
p = mp.Pool(processes=4)

ret = p.map_async(mpFunctionReportError,iterable=[dict(f=doNothing,queue=outQ,a='pointless!') for i in xrange(4)])
for i in xrange(4):

ret = p.map_async(mpFunctionReportError,iterable=[dict(f=raiseException,queue=outQ,a='pointless!') for i in xrange(2)])
for i in xrange(2):
    e,trace = outQ.get_nowait()

Running this example gives:

this is bad
Traceback (most recent call last):
  File "/home/john/projects/", line 13, in mpFunctionReportError
    rslt = f(**kwargs)
  File "/home/john/projects/", line 24, in raiseException
    raise ValueError('this is bad')
ValueError: this is bad

this is bad
Traceback (most recent call last):
  File "/home/john/projects/", line 13, in mpFunctionReportError
    rslt = f(**kwargs)
  File "/home/john/projects/", line 24, in raiseException
    raise ValueError('this is bad')
ValueError: this is bad

Upvotes: 2


Reputation: 1054

It's not very elegant, but how about:

def func(args):
    # Emulate that the function might be fast or slow
    ret  = args["Np"] - args["index"]
    # Emulate a bug 
    if args["index"]==args["problemIndex"]:
      ret = 1/(args["index"]-args["problemIndex"])
    # Return data
    if args["runParallel"]:
      # We use a queue thus ordering may not be protected
      return ret
  except Exception as e:

output should look like this(for problemIndex=9):

ERROR:root:integer division or modulo by zero
Traceback (most recent call last):
  File "/home/rciorba/", line 26, in func
    ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero
Bummer - one of more worker threads broke down 10 9

Upvotes: 1

Related Questions