Qubix
Qubix

Reputation: 4353

Using python multiprocessing on a for loop that appends results to dictionary

So I've looked at both the documentation of the multiprocessing module, and also at the other questions asked here, and none seem to be similar to my case, hence I started a new question.

For simplicity, I have a piece of code of the form:

# simple dataframe of some users and their properties.
data = {'userId': [1, 2, 3, 4],
        'property': [12, 11, 13, 43]}
df = pd.DataFrame.from_dict(data)

# a function that generates permutations of the above users, in the form of a list of lists
# such as [[1,2,3,4], [2,1,3,4], [2,3,4,1], [2,4,1,3]]
user_perm = generate_permutations(nr_perm=4)

# a function that computes some relation between users
def comp_rel(df, permutation, user_dict):
    df1 = df.userId.isin(permutation[0])
    df2 = df.userId.isin(permutation[1])
    user_dict[permutation[0]] += permutation[1]
    return user_dict


# and finally a loop: 
user_dict = defaultdict(int)
for permutation in user_perm:
    user_dict = comp_rel(df, permutation, user_dict)    

I know this code makes very little (if any) sense right now, but I just wrote a small example that is close to the structure of the actual code that I am working on. That user_dict should finally contain userIds and some value.

I have the actual code, and it works fine, gives the correct dict and everything, but... it runs on a single thread. And it's painfully slow, keeping in mind that I have another 15 threads totally free.

My question is, how can I use the multiprocessing module of python to change the last for loop, and be able to run on all threads/cores available? I looked at the documentation, it's not very easy to understand.

EDIT: I am trying to use pool as:

p = multiprocessing.Pool(multiprocessing.cpu_count())
p.map(comp_rel(df, permutation, user_dict), user_perm)
p.close()
p.join()

however this breaks because I am using the line :

user_dict = comp_rel(df, permutation, user_dict) 

in the initial code, and I don't know how these dictionaries should be merged after pool is done.

Upvotes: 4

Views: 2543

Answers (2)

Olvin Roght
Olvin Roght

Reputation: 7812

After short discussion in comments I've decided to post solution using ProcessPoolExecutor:

import concurrent.futures
from collections import defaultdict

def comp_rel(df, perm):
    ...
    return perm[0], perm[1]

user_dict = defaultdict(int)
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = {executor.submit(comp_rel, df, perm): perm for perm in user_perm}
    for future in concurrent.futures.as_completed(futures):
        try:
            k, v = future.result()
        except Exception as e:
            print(f"{futures[future]} throws {e}")
        else:
            user_dict[k] += v

It works same as @tzaman, but it gives you possibility to handle exceptions. Also there're more interesting features in this module, check docs.

Upvotes: 5

tzaman
tzaman

Reputation: 47790

There are two parts to your comp_rel which need to be separated - first is the calculation itself which is computing some value for some userID. The second is the "accumulation" step which is adding that value to the user_dict result.

You can separate the calculation itself so that it returns a tuple of (id, value) and farm it out with multiprocessing, then accumulate the results afterwards in the main thread:

from multiprocessing import Pool
from functools import partial
from collections import defaultdict

# We make this a pure function that just returns a result instead of mutating anything
def comp_rel(df, perm):
    ...
    return perm[0], perm[1]

comp_with_df = partial(comp_rel, df) # df is always the same, so factor it out
with Pool(None) as pool: # Pool(None) uses cpu_count automatically
    results = pool.map(comp_with_df, user_perm)

# Now add up the results at the end:
user_dict = defaultdict(int)
for k, v in results:
    user_dict[k] += v

Alternatively you could also pass a Manager().dict() object into the processing function directly, but that's a little more complicated and likely won't get you any additional speed.


Based on @Masklinn's suggestion, here's a slightly better way to do it to avoid memory overhead:

user_dict = defaultdict(int)
with Pool(None) as pool:
    for k, v in pool.imap_unordered(comp_with_df, user_perm):
        user_dict[k] += v

This way we add up the results as they complete, instead of having to store them all in an intermediate list first.

Upvotes: 2

Related Questions