Reputation: 1027
I am new to using Dask
and Numba
to speed up code, and I was hoping this could be a valuable question for users to get answers on best practices for how to parallelize code. I have made a generic test case of a pandas
dataframe with 3 columns.
A generic function is going to be implemented on 3 vectors in the frame made to represent the kind of transformation one might do in data analysis: the first two columns are squared, added, and then the square root is taken, and then a boolean is calculated comparing the result to the 3rd column.
I implement 4 test cases: (a) a pandas
apply, (b) Dask
, (c) Numba
, and (d) Dask
and Numba
together.
Numba
works great. All my issues are with Dask
. Here are the issues I'm having:
Dask
, no matter what size I make the vectors, is slower. I am perhaps not fully understanding how and when to compute certain parts of the dataframe, or how to make it parallelized properly. It is slower than the regular apply.# Practice parallelizing
from dask import dataframe as dd
from numba import jit
import pandas as pd
import numpy as np
import time
# df is going to be the regular dataframe
df = pd.DataFrame(np.random.random(size=(1000000,3))*100,columns=['col1','col2','col3'])
# ddf is the dask dataframe
ddf = dd.from_pandas(df,npartitions=4)
# Check the distance regular (probably wouldn't write like this but doing for symmetry)
def check_dist(col1,col2,col3):
dist = np.sqrt(col1**2+col2**2)
check = dist < col3
return check
# Jit
@jit(nopython=True)
def check_dist_fast(col1,col2,col3):
dist = np.sqrt(col1**2+col2**2)
check = dist < col3
return check
#####################################
# Regular Python Apply
#####################################
t0 = time.time()
df['col4'] = df.apply(lambda x: check_dist(x.col1,x.col2,x.col3),axis=1)
t1 = time.time()-t0
print("Regular pandas took",t1)
df = df.drop('col4',axis=1)
#####################################
# Dask Apply
#####################################
t0 = time.time()
ddf['col4'] = ddf.map_partitions(lambda d: d.apply(
lambda x: check_dist(x.col1,x.col2,x.col3),axis=1)
).compute()
t1 = time.time()-t0
print("Dask pandas took",t1)
ddf = ddf.drop('col4',axis=1)
#####################################
# Numba Pandas
#####################################
t0 = time.time()
df['col4'] = check_dist_fast(df.col1.to_numpy(),df.col2.to_numpy(),df.col3.to_numpy())
t1 = time.time()-t0
print("Numba pandas took",t1)
df = df.drop('col4',axis=1)
#####################################
# Numba + Jit Pandas
#####################################
t0 = time.time()
t0 = time.time()
ddf['col4'] = ddf.map_partitions(lambda d: d.apply(lambda x:
check_dist_fast(x.col1,x.col2,x.col3),axis=1)).compute()
t1 = time.time()-t0
print("Numba Dask pandas took",t1)
ddf = ddf.drop('col4',axis=1)
Finally, what are some other best practices one should be aware of. The idea is to send this to some kind of cluster with many nodes.
Times are:
Upvotes: 3
Views: 3520
Reputation: 128
I think dask is so slow because you are calculating a series using:
ddf.map_partitions(
lambda d: d.apply(lambda x: check_dist(x.col1,x.col2,x.col3), axis=1)
).compute()
and then subsequently assigning it to an new column, this way dask cannot parallelize the process. The following code does the same thing but runs in 0.06 seconds:
#####################################
# Dask Assign
#####################################
t0 = time.time()
ddf = ddf.assign(col4=lambda x: check_dist(x.col1,x.col2,x.col3))
ddf.compute()
t1 = time.time()-t0
print("Dask using Assign took",t1)
ddf = ddf.drop('col4',axis=1)
I would suggest having a look at the best practices section in the dask docs.
Hope this helps!
Upvotes: 5