Reputation: 23
I am trying to apply a bunch of functions on a dask data frame using map_partitions. It works when the function is defined locally eg:
#assume a data frame df1
def upper(x):
return x.str.upper()
def process(df,info):
for mapper,col in info['process']:
df[col] = df[col].map_partitions(mapper, meta=df[col])
return df
info = {'process':[(upper, 'column_name')]}
df = process(df1, info)
df.head()
but when we split the code into modules it does not work..
helper.py
def upper(x):
return x.str.upper()
def upper_lambda():
return lambda x: x.str.upper()
main.py
import helper
#assume a data frame df1
def process(df,info):
for mapper,col in info['process']:
df[col] = df[col].map_partitions(mapper, meta=df[col])
return df
info = {'process':[(getattr(helper,'upper'), 'column_name')]}
#Tried with the lambda too.. dosent seem to work
#info = {'process':[(helper.upper(), 'column_name')]}
df = process(df1, info)
df.head()
It just throws back KilledWorker:("('assign-read-parquet-head-1-5-assign-77bd7b855e5e8eec82312c65361fc7c5', 0)",
Upvotes: 0
Views: 249
Reputation: 57319
Dask certainly supports using functions from another module. However, those modules are expected to exist on all machines that you're using.
For small files like your helper.py
file you may want to look into Client.upload_file
to help you move it around.
Upvotes: 2