Reputation: 83147
Is it possible to apply function to each cell in a DataFrame multithreadedly in pandas?
I'm aware of pandas.DataFrame.applymap but it doesn't seem to allow multithreading natively:
import numpy as np
import pandas as pd
np.random.seed(1)
frame = pd.DataFrame(np.random.randn(4, 3), columns=list('bde'),
index=['Utah', 'Ohio', 'Texas', 'Oregon'])
print(frame)
format = lambda x: '%.2f' % x
frame = frame.applymap(format)
print(frame)
returns:
b d e
Utah 1.624345 -0.611756 -0.528172
Ohio -1.072969 0.865408 -2.301539
Texas 1.744812 -0.761207 0.319039
Oregon -0.249370 1.462108 -2.060141
b d e
Utah 1.62 -0.61 -0.53
Ohio -1.07 0.87 -2.30
Texas 1.74 -0.76 0.32
Oregon -0.25 1.46 -2.06
Instead, I would like to use more than one core to perform the operation, since the applied function may be complex.
Upvotes: 0
Views: 1002
Reputation: 83147
Note on Microsoft Windows to avoid the issue Attempt to start a new process before the current process has finished its bootstrapping phase, one has to place the code inside a main function, e.g.:
import numpy as np
import pandas as pd
from multiprocessing import Pool
def format(col):
return col.apply(lambda x: '%.2f' % x)
if __name__ == "__main__":
np.random.seed(1)
frame = pd.DataFrame(np.random.randn(4, 3), columns=list('bde'),
index=['Utah', 'Ohio', 'Texas', 'Oregon'])
print(frame)
cores = 2
pool = Pool(cores)
for out_col in pool.imap(format, [frame[i] for i in frame]):
frame[out_col.name] = out_col
pool.close()
pool.join()
print(frame)
Regarding the use of np.array_split
, since the dataframe is cast into a numpy array, it only works for numbers. Example:
import numpy as np
import pandas as pd
from multiprocessing import Pool
def myfunc(a, b):
'''
Return a-b if a>b, otherwise return a+b
Taken from https://docs.scipy.org/doc/numpy/reference/generated/numpy.vectorize.html
'''
if a > b:
return a - b
else:
return a + b
def format(col):
vfunc = np.vectorize(myfunc)
return pd.DataFrame(vfunc(col,2))
if __name__ == "__main__":
np.random.seed(1)
frame = pd.DataFrame(np.random.randn(4, 3), columns=list('bde'),
index=['Utah', 'Ohio', 'Texas', 'Oregon'])
print(frame)
cores = 2
size = 2
pool = Pool(cores)
frame_split = np.array_split(frame.as_matrix(), size)
print (frame_split)
columns = frame.columns
frame = pd.concat(pool.imap(format, frame_split)).set_index(frame.index)
frame.columns = columns
pool.close()
pool.join()
print(frame)
returns:
b d e
Utah 1.624345 -0.611756 -0.528172
Ohio -1.072969 0.865408 -2.301539
Texas 1.744812 -0.761207 0.319039
Oregon -0.249370 1.462108 -2.060141
[array([[ 1.62434536, -0.61175641, -0.52817175],
[-1.07296862, 0.86540763, -2.3015387 ]]), array([[ 1.74481176, -0.7612069 , 0.3190391 ],
[-0.24937038, 1.46210794, -2.06014071]])]
b d e
Utah 3.624345 1.388244 1.471828
Ohio 0.927031 2.865408 -0.301539
Texas 3.744812 1.238793 2.319039
Oregon 1.750630 3.462108 -0.060141
Upvotes: 0
Reputation: 566
Split by columns:
from multiprocessing import Pool
def format(col):
return col.apply(lambda x: '%.2f' % x)
cores = 5
pool = Pool(cores)
for out_col in pool.imap(format, [frame[i] for i in frame]):
frame[out_col.name] = out_col
pool.close()
pool.join()
Or split by partitions size as mentioned in comments:
size = 10
frame_split = np.array_split(frame, size)
frame = pd.concat(pool.imap(func, frame_split))
Upvotes: 1