Michael
Michael

Reputation: 7397

Multiprocessing process intermediate output

I have a function that loads data and loops through times e.g.

def calculate_profit(account):
    account_data = load(account) #very expensive operation
    for day in account_data.days:
        print(account_data.get(day).profit)

Because the loading of the data is expensive it makes sense to use joblib/multiprocessing to do something like this:

arr = [account1, account2, account3, ...]
joblib.Parallel(n_jobs=-1)(delayed(calculate_profit)(arr))

However, I have another expensive function that I would like to apply on the intermediate results of the calculate_profit function. For example, assume that it is an expensive operation to sum up all of the profit and process it/post it to website/etc. Also I need the previous day's profits to calculate the profit change in this function.

def expensive_sum(prev_day_profits, *account_profits):
    total_profit_today = sum(account_profits)
    profit_difference = total_profit_today - prev_day_profits

    #some other expensive operation
    #more expensive operations

So I would like to

  1. Run the multiprocessing processes in parallel (to lessen the load of loading in all of the expensive account data)
  2. Once each multiprocessing process hits a predefined point (e.g. finished one iteration of the loop), return those intermediate values to another function (expensive_sum) to process - assume that each individual multiprocessing process cannot continue until expensive_sum returns
  3. HOWEVER, I want to keep the multiprocessing processes alive so that I don't have to reinitialize them (reducing that overhead)

Is there any way to do this?

Upvotes: 1

Views: 417

Answers (1)

vks
vks

Reputation: 67988

from multiprocessing import Manager
queue = manager.Queue()

Once each multiprocessing process hits a predefined point do

queue.put(item)

Meanwhile the other expensive function does

queue.get(item)  ==>  blocking call for get

The expensive function waits on get and goes ahead when it gets a value processes it and again waits on get

Upvotes: 1

Related Questions