user2416719
user2416719

Reputation: 23

Python, Dask - Using functions from another module and mapping to Dask Dataframe

I am trying to apply a bunch of functions on a dask data frame using map_partitions. It works when the function is defined locally eg:

#assume a data frame df1

def upper(x):
    return x.str.upper()

def process(df,info): 
    for mapper,col in info['process']:
        df[col] = df[col].map_partitions(mapper, meta=df[col])
    return df

info = {'process':[(upper, 'column_name')]}
df = process(df1, info)

df.head()

but when we split the code into modules it does not work..

helper.py

def upper(x):
    return x.str.upper()

def upper_lambda():
    return lambda x: x.str.upper()

main.py

import helper

#assume a data frame df1

def process(df,info): 
    for mapper,col in info['process']:
        df[col] = df[col].map_partitions(mapper, meta=df[col])
    return df

info = {'process':[(getattr(helper,'upper'), 'column_name')]}
#Tried with the lambda too.. dosent seem to work 
#info = {'process':[(helper.upper(), 'column_name')]}

df = process(df1, info)

df.head()

It just throws back KilledWorker:("('assign-read-parquet-head-1-5-assign-77bd7b855e5e8eec82312c65361fc7c5', 0)",

Upvotes: 0

Views: 249

Answers (1)

MRocklin
MRocklin

Reputation: 57319

Dask certainly supports using functions from another module. However, those modules are expected to exist on all machines that you're using.

For small files like your helper.py file you may want to look into Client.upload_file to help you move it around.

Upvotes: 2

Related Questions