ajspencer
ajspencer

Reputation: 1097

Parallelize Python's reduce command

In Python I'm running a command of the form

reduce(func, bigArray[1:], bigArray[0])

and I'd like to add parallel processing to speed it up.

I am aware I can do this manually by splitting the array, running processes on the separate portions, and combining the result.

However, given the ubiquity of running reduce in parallel, I wanted to see if there's a native way, or a library, that will do this automatically.

I'm running a single machine with 6 cores.

Upvotes: 3

Views: 2860

Answers (2)

DrRaspberry
DrRaspberry

Reputation: 397

If you're able to combine map and reduce (or want to concatenate the result instead of a more general reduce) you could use mr4p:

https://github.com/lapets/mr4mp

The code for the _reduce function inside the class appears to implement parallel processing via multiprocessing.pool to pool the usual reduce processes, roughly by following a process:

reduce(<Function used to reduce>, pool.map(partial(reduce, <function used to reduce>), <List of results to reduce>))

I haven't tried it yet but it seems the syntax is:

mr4mp.pool().mapreduce(<Function to be mapped>,<Function used to reduce>, <List of entities to apply function on>)

Upvotes: 0

ajspencer
ajspencer

Reputation: 1097

For anyone stumbling across this, I ended up writing a helper to do it

def parallelReduce(l, numCPUs, connection=None):

    if numCPUs == 1 or len(l) <= 100:
            returnVal= reduce(reduceFunc, l[1:], l[0])
            if connection != None:
                    connection.send(returnVal)
            return returnVal

    parent1, child1 = multiprocessing.Pipe()
    parent2, child2 = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=parallelReduce, args=(l[:len(l) // 2], numCPUs // 2, child1, ) )
    p2 = multiprocessing.Process(target=parallelReduce, args=(l[len(l) // 2:], numCPUs // 2 + numCPUs%2, child2, ) )
    p1.start()
    p2.start()
    leftReturn, rightReturn = parent1.recv(), parent2.recv()
    p1.join()
    p2.join()
    returnVal = reduceFunc(leftReturn, rightReturn)
    if connection != None:
            connection.send(returnVal)
    return returnVal

Note that you can get the number of CPUs with multiprocessing.cpu_count()

Using this function showed substantial performance increase over the serial version.

Upvotes: 9

Related Questions