Reputation: 1357
I'm trying to read and process in parallel a list of csv files and concatenate the output in a single pandas dataframe
for further processing.
My workflow consist of 3 steps:
create a series of pandas dataframe by reading a list of csv files (all with the same structure)
def loadcsv(filename):
df = pd.read_csv(filename)
return df
for each dataframe create a new column by processing 2 existing columns
def makegeom(a,b):
return 'Point(%s %s)' % (a,b)
def applygeom(df):
df['Geom']= df.apply(lambda row: makegeom(row['Easting'],
row['Northing']),
axis=1)
return df
concatenate all the dataframes in a single dataframe
frames = []
for i in csvtest:
df = applygeom(loadcsv(i))
frames.append(df)
mergedresult1 = pd.concat(frames)
In my workflow I use pandas (each csv (15) file has more than >> 2*10^6 data points) so it takes a while to complete. I think this kind of workflow should take advantage of some parallel processing (at least for the read_csv
and apply
steps) so I gave a try to dask, but I was not able to use it properly. In my attempt I did'n gain any improvement in speed.
I made a simple notebook so to replicate what I'm doing:
https://gist.github.com/epifanio/72a48ca970a4291b293851ad29eadb50
My question is ... what's the proper way to use dask to accomplish my use case?
Upvotes: 4
Views: 6093
Reputation: 1357
In the meantime, I have found other ways (alternative to Dask), in my opinion relatively easier, to perform a function func
in parallel over a pandas data frame. In both cases, I took advantage of the numpy.array_split
method.
One uses a combination of the python multiprocessing.Pool
, numpy.array_split
and pandas.concat
and will work this way:
import numpy as np
def func(array):
# do some computation on the given array
pass
def parallelize_dataframe(df, func, n_cores=72):
df_split = np.array_split(df, n_cores)
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
Another is by using the powerful but simple ray
cluster (which is quite useful if you can run the code over multiple machines):
# connect to a ray cluster
#
import ray
ray.init(address="auto", redis_password="5241590000000000")
import numpy as np
@ray.remote
def func(df):
# do some computation on the given dataframe
pass
df_split = np.array_split(df, 288)
result = pd.concat(ray.get([func.remote(i) for i in df_split]))
The methods above are working quite well for simple methods func
where the computation can be carried out with numpy and the product which is returned can be concatenated back into a pandas data frame - for methods that do simpler file manipulation I also found useful parmap.map
- but that is off-topic for this S.O. question.
Upvotes: 1
Reputation: 57271
In Pandas I would use the apply method
In [1]: import pandas as pd
In [2]: df = pd.DataFrame({'a': [1, 2, 3], 'b': [3, 2, 1]})
In [3]: def makegeom(row):
...: a, b = row
...: return 'Point(%s %s)' % (a, b)
...:
In [4]: df.apply(makegeom, axis=1)
Out[4]:
0 Point(1 3)
1 Point(2 2)
2 Point(3 1)
dtype: object
In dask.dataframe you can do the same thing
In [5]: import dask.dataframe as dd
In [6]: ddf = dd.from_pandas(df, npartitions=2)
In [7]: ddf.apply(makegeom, axis=1).compute()
Out[7]:
0 Point(1 3)
1 Point(2 2)
2 Point(3 1)
In either case you can then add the new series to the dataframe
df['geom'] = df[['a', 'b']].apply(makegeom)
If you have CSV data then I would use the dask.dataframe.read_csv function
ddf = dd.read_csv('filenames.*.csv')
If you have other kinds of data then I would use dask.delayed
Upvotes: 4