Reputation: 7397
I have a function that loads data and loops through times e.g.
def calculate_profit(account):
account_data = load(account) #very expensive operation
for day in account_data.days:
print(account_data.get(day).profit)
Because the loading of the data is expensive it makes sense to use joblib/multiprocessing to do something like this:
arr = [account1, account2, account3, ...]
joblib.Parallel(n_jobs=-1)(delayed(calculate_profit)(arr))
However, I have another expensive function that I would like to apply on the intermediate results of the calculate_profit
function. For example, assume that it is an expensive operation to sum up all of the profit and process it/post it to website/etc. Also I need the previous day's profits to calculate the profit change in this function.
def expensive_sum(prev_day_profits, *account_profits):
total_profit_today = sum(account_profits)
profit_difference = total_profit_today - prev_day_profits
#some other expensive operation
#more expensive operations
So I would like to
expensive_sum
) to process - assume that each individual multiprocessing process cannot continue until expensive_sum
returnsIs there any way to do this?
Upvotes: 1
Views: 417
Reputation: 67988
from multiprocessing import Manager
queue = manager.Queue()
Once each multiprocessing process hits a predefined point do
queue.put(item)
Meanwhile the other expensive function does
queue.get(item) ==> blocking call for get
The expensive function waits on get
and goes ahead when it gets a value processes it and again waits on get
Upvotes: 1