Reputation: 441
I want to use multiprocessing for reading csv and adding them together. In this case it can be unsorted.This code I have used but apperently the dataframe cant be added to the data for some reason. I have also tried data=data.append(tb,ignore_index=True)
. I also tried with global data
within the function but I still get no outcome.
files=[]
for x in glob.glob("*.csv"):
files.append(x)
data = pd.DataFrame()
def opener(file):
tb=pd.read_csv(file)
data.append(tb,ignore_index=True)
if __name__ == '__main__':
p = Pool(8)
p.map(opener, files)
p.close()
p.join()
print(data)
I get only
Empty DataFrame
Columns: []
Index: []
Upvotes: 3
Views: 4281
Reputation: 441
Well after trying different ways I managed to run the multiprocessing for opening csv files somehow like this:
import os, glob
import pandas as pd
from multiprocessing import Pool
files = []
for x in glob.glob("*.csv"):
files.append(x)
def data_pool(file):
return pd.read_csv(file)
data = None
if __name__ == '__main__':
p = Pool(4)
data = pd.concat(p.map(data_pool, files)).reset_index(drop=True)
It brings the same performance like the solution of @RomanPerekhrest but I think its usefull to share it with the community as an alternative solution.
Upvotes: 0
Reputation: 92894
Passing a dataframe as a shared data structure in multiprocessing
approach would be quite problematic cause a shared structure needs to be pickled/packed and passed to each Process
run in parallel.
Instead, pass a shared list as process-safe multiprocessing.Manager.list
to accumulate dataframes which then will be concatenated in one pass with pd.concat
invocation:
By the way, pd.concat
should be a preferred approach in comparing to (pd.DataFrame()
+ multiple .append()
calls).
from multiprocessing import Pool, Manager
import glob
import pandas as pd
from functools import partial
def csv_to_df(lst, fname):
lst.append(pd.read_csv(fname))
if __name__ == '__main__':
dfs_list = Manager().list()
pool = Pool(processes=8)
files = glob.iglob('*.csv')
res = pool.map_async(partial(csv_to_df, dfs_list), files)
res.wait()
dfs = pd.concat(dfs_list, ignore_index=True) # the final result
print(dfs)
Upvotes: 4