Amir
Amir

Reputation: 441

Append dataframes to a list ouside of multiprocessing target function

I want to use multiprocessing for reading csv and adding them together. In this case it can be unsorted.This code I have used but apperently the dataframe cant be added to the data for some reason. I have also tried data=data.append(tb,ignore_index=True). I also tried with global data within the function but I still get no outcome.

files=[]
for x in glob.glob("*.csv"):
    files.append(x)

data = pd.DataFrame()

def opener(file):
    tb=pd.read_csv(file)
    data.append(tb,ignore_index=True)

if __name__ == '__main__':
    p = Pool(8)
    p.map(opener, files)
    p.close()
    p.join()

print(data)

I get only

Empty DataFrame 
Columns: [] 
Index: []

Upvotes: 3

Views: 4281

Answers (2)

Amir
Amir

Reputation: 441

Well after trying different ways I managed to run the multiprocessing for opening csv files somehow like this:

import os, glob
import pandas as pd
from multiprocessing import Pool

files = []
for x in glob.glob("*.csv"):
    files.append(x)

def data_pool(file):
    return pd.read_csv(file)

data = None

if __name__ == '__main__':
    p = Pool(4)
    data = pd.concat(p.map(data_pool, files)).reset_index(drop=True)

It brings the same performance like the solution of @RomanPerekhrest but I think its usefull to share it with the community as an alternative solution.

Upvotes: 0

RomanPerekhrest
RomanPerekhrest

Reputation: 92894

Passing a dataframe as a shared data structure in multiprocessing approach would be quite problematic cause a shared structure needs to be pickled/packed and passed to each Process run in parallel.
Instead, pass a shared list as process-safe multiprocessing.Manager.list to accumulate dataframes which then will be concatenated in one pass with pd.concat invocation:

By the way, pd.concat should be a preferred approach in comparing to (pd.DataFrame() + multiple .append() calls).

from multiprocessing import Pool, Manager
import glob
import pandas as pd
from functools import partial

def csv_to_df(lst, fname):
    lst.append(pd.read_csv(fname))


if __name__ == '__main__':
    dfs_list = Manager().list()
    pool = Pool(processes=8)
    files = glob.iglob('*.csv')
    res = pool.map_async(partial(csv_to_df, dfs_list), files)
    res.wait()
    dfs = pd.concat(dfs_list, ignore_index=True)  # the final result
    print(dfs)  

Upvotes: 4

Related Questions