Balaji Krishnan
Balaji Krishnan

Reputation: 457

Python parallel data processing

We have a dataset which has approx 1.5MM rows. I would like to process that in parallel. The main function of that code is to lookup master information and enrich the 1.5MM rows. The master is a two column dataset with roughly 25000 rows. However i am unable to make the multi-process work and test its scalability properly. Can some one please help. The cut-down version of the code is as follows

import pandas
from multiprocessing import Pool

def work(data):
    mylist =[]
    #Business Logic
    return mylist.append(data)

if __name__ == '__main__':
    data_df = pandas.read_csv('D:\\retail\\customer_sales_parallel.csv',header='infer')
    print('Source Data :', data_df)
    agents = 2
    chunksize = 2
    with Pool(processes=agents) as pool:
            result = pool.map(func=work, iterable= data_df, chunksize=20)
            pool.close()
            pool.join()
    print('Result :', result)

Method work will have the business logic and i would like to pass partitioned data_df into work to enable parallel processing. The sample data is as follows

CUSTOMER_ID,PRODUCT_ID,SALE_QTY
641996,115089,2
1078894,78144,1
1078894,121664,1
1078894,26467,1
457347,59359,2
1006860,36329,2
1006860,65237,2
1006860,121189,2
825486,78151,2
825486,78151,2
123445,115089,4

Ideally i would like to process 6 rows in each partition.

Please help.

Thanks and Regards

Bala

Upvotes: 1

Views: 957

Answers (2)

Brenden Petersen
Brenden Petersen

Reputation: 2022

First, work is returning the output of mylist.append(data), which is None. I assume (and if not, I suggest) you want to return a processed Dataframe.

To distribute the load, you could use numpy.array_split to split the large Dataframe into a list of 6-row Dataframes, which are then processed by work.

import pandas
import math
import numpy as np
from multiprocessing import Pool

def work(data):
    #Business Logic
    return data # Return it as a Dataframe

if __name__ == '__main__':
    data_df = pandas.read_csv('D:\\retail\\customer_sales_parallel.csv',header='infer')
    print('Source Data :', data_df)
    agents = 2
    rows_per_workload = 6
    num_loads = math.ceil(data_df.shape[0]/float(rows_per_workload))
    split_df = np.array_split(data_df, num_loads) # A list of Dataframes
    with Pool(processes=agents) as pool:
        result = pool.map(func=work, iterable=split_df)
        result = pandas.concat(result) # Stitch them back together    
        pool.close()
        pool.join()pool = Pool(processes=agents)
    print('Result :', result)

Upvotes: 2

Marco
Marco

Reputation: 1172

My best recommendation is for you to use the chunksize parameter in read_csv (Docs) and iterate over. This way you wont crash your ram trying to load everything plus if you want you can for example use threads to speed up the process.

for i,chunk in enumerate(pd.read_csv('bigfile.csv', chunksize=500000)):

Im not sure if this answer your specific question but i hope it helps.

Upvotes: 0

Related Questions