Lucas H
Lucas H

Reputation: 1027

How to properly parallelize generic code with Numba + Dask

I am new to using Dask and Numba to speed up code, and I was hoping this could be a valuable question for users to get answers on best practices for how to parallelize code. I have made a generic test case of a pandas dataframe with 3 columns.

A generic function is going to be implemented on 3 vectors in the frame made to represent the kind of transformation one might do in data analysis: the first two columns are squared, added, and then the square root is taken, and then a boolean is calculated comparing the result to the 3rd column.

I implement 4 test cases: (a) a pandas apply, (b) Dask, (c) Numba, and (d) Dask and Numba together.

Numba works great. All my issues are with Dask. Here are the issues I'm having:

  1. Dask, no matter what size I make the vectors, is slower. I am perhaps not fully understanding how and when to compute certain parts of the dataframe, or how to make it parallelized properly. It is slower than the regular apply.
  2. How do you properly use Dask to parallelize? I have drawn it up as 4 partitions and I have 2 core processor, but how do you actually decide how to format this?
# Practice parallelizing
from dask import dataframe as dd
from numba import jit
import pandas as pd
import numpy as np
import time

# df is going to be the regular dataframe
df = pd.DataFrame(np.random.random(size=(1000000,3))*100,columns=['col1','col2','col3'])

# ddf is the dask dataframe
ddf = dd.from_pandas(df,npartitions=4)

# Check the distance regular (probably wouldn't write like this but doing for symmetry)
def check_dist(col1,col2,col3):
    dist = np.sqrt(col1**2+col2**2)
    check = dist < col3
    return check

# Jit
@jit(nopython=True)
def check_dist_fast(col1,col2,col3):
    dist = np.sqrt(col1**2+col2**2)
    check = dist < col3
    return check

#####################################
# Regular Python Apply
#####################################
t0 = time.time()
df['col4'] = df.apply(lambda x: check_dist(x.col1,x.col2,x.col3),axis=1)
t1 = time.time()-t0
print("Regular pandas took",t1)
df = df.drop('col4',axis=1)

#####################################
# Dask Apply
#####################################
t0 = time.time()
ddf['col4'] = ddf.map_partitions(lambda d: d.apply(
                                    lambda x: check_dist(x.col1,x.col2,x.col3),axis=1)
                                ).compute()
t1 = time.time()-t0
print("Dask pandas took",t1)
ddf = ddf.drop('col4',axis=1)


#####################################
# Numba Pandas
#####################################
t0 = time.time()
df['col4'] = check_dist_fast(df.col1.to_numpy(),df.col2.to_numpy(),df.col3.to_numpy())
t1 = time.time()-t0
print("Numba pandas took",t1)
df = df.drop('col4',axis=1)


#####################################
# Numba + Jit Pandas
#####################################
t0 = time.time()
t0 = time.time()

ddf['col4'] = ddf.map_partitions(lambda d: d.apply(lambda x:
                    check_dist_fast(x.col1,x.col2,x.col3),axis=1)).compute()
t1 = time.time()-t0
print("Numba Dask pandas took",t1)
ddf = ddf.drop('col4',axis=1)

Finally, what are some other best practices one should be aware of. The idea is to send this to some kind of cluster with many nodes.

Times are:

Upvotes: 3

Views: 3520

Answers (1)

FlorianEn
FlorianEn

Reputation: 128

I think dask is so slow because you are calculating a series using:

ddf.map_partitions(
    lambda d: d.apply(lambda x: check_dist(x.col1,x.col2,x.col3), axis=1)
    ).compute()

and then subsequently assigning it to an new column, this way dask cannot parallelize the process. The following code does the same thing but runs in 0.06 seconds:

#####################################
# Dask Assign
#####################################
t0 = time.time()
ddf = ddf.assign(col4=lambda x: check_dist(x.col1,x.col2,x.col3))
ddf.compute()
t1 = time.time()-t0
print("Dask using Assign took",t1)
ddf = ddf.drop('col4',axis=1)

I would suggest having a look at the best practices section in the dask docs.

Hope this helps!

Upvotes: 5

Related Questions