Reputation: 15
I'm trying to add new value to a dataframe in parallel work.
mgr = mp.Manager()
ns = mgr.Namespace()
ns.df = pd.read_csv('testfile.csv')
def main(q):
file=datalist(q)
nc=nc4.Dataset(file,'r')# This is the process of reading some data.
def dosth():#process the data, get the result to add
####
return a
a=dosth(nc)
ns.df[x,'y']=a
if __name__ == '__main__':
l2 = np.arange(len(datalist))
pool = mp.Pool(processes=10)
pool.map(main,l2)
pool.close()
pool.join()
ns.df.to_excel('result_t.xlsx')
I tried the code above. But I still can't get the result processed by my own function.
Where is my problem?
Thank you in advance.
Upvotes: 1
Views: 1507
Reputation: 2180
This is what I use to apply functions on a data-frame in parallel;
import multiprocessing
def apply_parallel(df, func, args={}):
""" Multiprocessing apply for Dataframe """
cores = multiprocessing.cpu_count()
if args: func = partial(func, **args)
df_split = numpy.array_split(df, cores)
with multiprocessing.Pool(cores) as pool:
results = pool.map(func, df_split)
try:
df = pandas.concat(results)
except ValueError:
# result could be a list of Nones
pass
return df
Upvotes: 1