Reputation: 4353
So I've looked at both the documentation of the multiprocessing module, and also at the other questions asked here, and none seem to be similar to my case, hence I started a new question.
For simplicity, I have a piece of code of the form:
# simple dataframe of some users and their properties.
data = {'userId': [1, 2, 3, 4],
'property': [12, 11, 13, 43]}
df = pd.DataFrame.from_dict(data)
# a function that generates permutations of the above users, in the form of a list of lists
# such as [[1,2,3,4], [2,1,3,4], [2,3,4,1], [2,4,1,3]]
user_perm = generate_permutations(nr_perm=4)
# a function that computes some relation between users
def comp_rel(df, permutation, user_dict):
df1 = df.userId.isin(permutation[0])
df2 = df.userId.isin(permutation[1])
user_dict[permutation[0]] += permutation[1]
return user_dict
# and finally a loop:
user_dict = defaultdict(int)
for permutation in user_perm:
user_dict = comp_rel(df, permutation, user_dict)
I know this code makes very little (if any) sense right now, but I just wrote a small example that is close to the structure of the actual code that I am working on. That user_dict
should finally contain userIds
and some value.
I have the actual code, and it works fine, gives the correct dict and everything, but... it runs on a single thread. And it's painfully slow, keeping in mind that I have another 15 threads totally free.
My question is, how can I use the multiprocessing
module of python to change the last for loop, and be able to run on all threads/cores available? I looked at the documentation, it's not very easy to understand.
EDIT: I am trying to use pool as:
p = multiprocessing.Pool(multiprocessing.cpu_count())
p.map(comp_rel(df, permutation, user_dict), user_perm)
p.close()
p.join()
however this breaks because I am using the line :
user_dict = comp_rel(df, permutation, user_dict)
in the initial code, and I don't know how these dictionaries should be merged after pool is done.
Upvotes: 4
Views: 2543
Reputation: 7812
After short discussion in comments I've decided to post solution using ProcessPoolExecutor
:
import concurrent.futures
from collections import defaultdict
def comp_rel(df, perm):
...
return perm[0], perm[1]
user_dict = defaultdict(int)
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(comp_rel, df, perm): perm for perm in user_perm}
for future in concurrent.futures.as_completed(futures):
try:
k, v = future.result()
except Exception as e:
print(f"{futures[future]} throws {e}")
else:
user_dict[k] += v
It works same as @tzaman, but it gives you possibility to handle exceptions. Also there're more interesting features in this module, check docs.
Upvotes: 5
Reputation: 47790
There are two parts to your comp_rel
which need to be separated - first is the calculation itself which is computing some value for some userID. The second is the "accumulation" step which is adding that value to the user_dict
result.
You can separate the calculation itself so that it returns a tuple of (id, value)
and farm it out with multiprocessing, then accumulate the results afterwards in the main thread:
from multiprocessing import Pool
from functools import partial
from collections import defaultdict
# We make this a pure function that just returns a result instead of mutating anything
def comp_rel(df, perm):
...
return perm[0], perm[1]
comp_with_df = partial(comp_rel, df) # df is always the same, so factor it out
with Pool(None) as pool: # Pool(None) uses cpu_count automatically
results = pool.map(comp_with_df, user_perm)
# Now add up the results at the end:
user_dict = defaultdict(int)
for k, v in results:
user_dict[k] += v
Alternatively you could also pass a Manager().dict()
object into the processing function directly, but that's a little more complicated and likely won't get you any additional speed.
Based on @Masklinn's suggestion, here's a slightly better way to do it to avoid memory overhead:
user_dict = defaultdict(int)
with Pool(None) as pool:
for k, v in pool.imap_unordered(comp_with_df, user_perm):
user_dict[k] += v
This way we add up the results as they complete, instead of having to store them all in an intermediate list first.
Upvotes: 2