Antonin
Antonin

Reputation: 1830

Giving a large dictionary to a async function is making the code very slow

I use multiprocessing in my python code to run asynchronously a function:

import multiprocessing

po = multiprocessing.Pool()
for elements in a_list:
    results.append(po.apply_async(my_module.my_function, (some_arguments, elements, a_big_argument)))               
po.close()
po.join()
for r in results:
    a_new_list.add(r.get())

a_big_argument is a dictionary. I give it as an argument. It is big in the sense that it is between 10 and 100 Mo. It seems like it has a big impact on the performance of my code.

I'm probably doing something stupid and not efficient here, since the performance of my code really went down with this new argument.

What is the best way to deal with a big dictionary? I don't want to load it every time in my function. Would it be a solution to create a database instead and to connect to it?

Here is a code you can run:

'''
Created on Mar 11, 2013

@author: Antonin
'''

import multiprocessing
import random

# generate an artificially big dictionary
def generateBigDict():
    myBigDict = {}
    for key in range (0,1000000):
        myBigDict[key] = 1
    return myBigDict

def myMainFunction():
    # load the dictionary
    myBigDict = generateBigDict()
    # create a list on which we will asynchronously run the subfunction
    myList = []
    for list_element in range(0,20):
        myList.append(random.randrange(0,1000000))
    # an empty set to receive results
    set_of_results = set()
    # there is a for loop here on one of the arguments
    for loop_element in range(0,150):
        results = []
        # asynchronoulsy run the subfunction
        po = multiprocessing.Pool()
        for list_element in myList:
            results.append(po.apply_async(mySubFunction, (loop_element, list_element, myBigDict)))               
        po.close()
        po.join()
        for r in results:
            set_of_results.add(r.get())
    for element in set_of_results:
        print element

def mySubFunction(loop_element, list_element, myBigDict):
    import math
    intermediaryResult = myBigDict[list_element]
    finalResult = intermediaryResult + loop_element
    return math.log(finalResult)

if __name__ == '__main__':
    myMainFunction()

Upvotes: 1

Views: 4838

Answers (3)

Antonin
Antonin

Reputation: 1830

I used multiprocessing.Manager to do it.

import multiprocessing

manager = multiprocessing.Manager()
a_shared_big_dictionary = manager.dict(a_big_dictionary)

po = multiprocessing.Pool()
for elements in a_list:
    results.append(po.apply_async(my_module.my_function, (some_arguments, elements, a_shared_big_dictionary)))               
po.close()
po.join()
for r in results:
    a_new_list.add(r.get())

Now, it is much faster.

Upvotes: 3

Francis Avila
Francis Avila

Reputation: 31631

Any argument you pass to one of the Pool methods (e.g. apply_async) needs to be pickled, sent to the worker processes via pipes, and unpickled in the worker processes. This pickle/pass/unpickle process can be expensive in time and memory, especially if you have a large object graph since each worker process must create a separate copy.

There are many different ways to avoid these pickles depending on the exact shape of your problem. Since your workers are only reading your dictionary and not writing to it, you can safely reference it directly from your function (i.e. not pass it to apply_async) and rely on fork() to avoid creating a copy in worker processes.

Even better, you can change mySubFunction() so that it accepts intermediaryResult as an argument instead of looking it up using list_element and myBigDict. (You may be able to do this with a closure, but I am not 100% sure that pickle won't try to copy the closed-over myBigDict object as well.)

Alternatively, you can put myBigDict in some place where all processes can share it safely, e.g. one of the simple persistance methods, such as dbm or sqlite, and have workers access it from there.

Unfortunately all these solutions require that you change the shape of your task functions. Avoiding this "shape-changing" is one reason why people like "real" cpu threads.

Upvotes: 1

CaptSolo
CaptSolo

Reputation: 1801

See the answer to Shared-memory objects in python multiprocessing question.

It suggests either using multiprocessing.Array to pass arrays to subprocesses or using fork().

Upvotes: 1

Related Questions