W Oliverio
W Oliverio

Reputation: 71

Multiple pandas data frames as parameters for multiprocessing

I'm kind of new to Python so I would like to apologise if this is such a stupid question.

I'm currently working on a development to perform several data checks.

In short I have a main data frame that will need to be validated against the data some other data frames.

The code I did is working properly on single core, but due to the volume I will need to implement multicore processing. The issue is that I can't find how to pass multiple pandas dataframes as arguments to the function module.

Note that the main dataset purchase_orders is already split between the process, so each one will receive 1/4 of the data. The other dataframes should be smaller and are exactly the same, so if there is a way to make the spawned process access the data frames created on main process would be fine as well since I will only read the data from them.

Dataframes change_log and parameters will be used on the apply method.

Code is below:

# this is the code I would like to call for multi processing
def apply_scores_test(purchase_orders, change_log, parameters):
    print('Running multicore')
    size = 1
    g_first = 'X'
    g_results = 'START'
    g_temp_lifnr = 'X'
    purchase_orders = purchase_orders.apply(calculate_scores, axis=1)
    return purchase_orders

# Starting the multi-core processing (locked to 4 process to make it easier to test)
p = multiprocessing.Pool(4) 
args = [(g_purchase_orders_1, change_log, parameters), (g_purchase_orders_2, change_log, parameters), (g_purchase_orders_3, change_log, parameters),(g_purchase_orders_4, change_log, parameters),]
res = p.map(apply_scores_test, args)
p.close()
p.join()

The error I'm currently receiving is that the function module is just receiving one single argument as shown below:

TypeError: apply_scores_test() missing 2 required positional arguments: 
'change_log' and 'parameters'

Does anybody has any clue on how can I pass 3 pandas dataframe to the function module when I start multiprocessing???

UPDATE: I tried some more tests using multiprocessing.pool.starmap instead of map and I receive the same error. I tried to use partial as well but does not work either since I will need to use at least arguments and as far as I understand 'partial' only works with 2 arguments.

I anyone has any other ideas specifically with dataframes I would be really appreciated.

Thanks a lot in advance.

Bill

Upvotes: 6

Views: 2268

Answers (1)

Parfait
Parfait

Reputation: 107767

As linked, consider re-factoring your code using starmap encapsulated in a context manager, with. For Windows, be sure to run multiprocessing inside if __name__ == '__main__':

import multiprocessing
...

args = [(g_purchase_orders_1, change_log, parameters), 
        (g_purchase_orders_2, change_log, parameters), 
        (g_purchase_orders_3, change_log, parameters),
        (g_purchase_orders_4, change_log, parameters)]

if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as p:

        # LIST OF RETURNED DATAFRAMES
        results = p.starmap(apply_scores_test, args)

        # OUTPUT RESULTS
        for r in results:
            print(r)

Upvotes: 1

Related Questions