Reputation: 8121
I have the following project tree
.
└── src
└── dask_test
├── helpers
│ ├── commandline.py
│ ├── data
│ │ ├── dataset0.json
│ │ ├── dataset1000.json
│ │ ├── dataset300.json
│ │ ├── dataset5000.json
│ │ ├── dataset500.json
│ │ ├── events_to_be_used_final_without_google.nl.json
│ │ ├── http-malware_modified.log
│ │ └── public_suffix_list.json
│ ├── datetime.py
│ ├── datetime.pyc
│ ├── __init__.py
│ ├── __init__.pyc
│ ├── math.py
│ ├── math.pyc
│ ├── pipeline.py
│ ├── queues.py
│ ├── search.py
│ ├── services.py
│ ├── sklearn.py
│ ├── splunk_format.py
│ ├── splunk.py
│ └── sqlalchemy.py
├── __init__.py
├── __init__.pyc
├── main.py
└── riskanalysis
├── iaccess
│ ├── __init__.py
│ └── metrics
│ ├── base.py
│ ├── __init__.py
│ └── profile
│ └── __init__.py
├── __init__.py
└── metrics
└── __init__.py
In the beginning of my main.py
I import an object from `dask_test.helpers.datetime' like this
from dask_test.helpers.datetime import Timewindow
to use down in my main. In my main file, I have defined some function and apply them on a dask Dataframe like this
dataframe = transformation1(dataframe)
dataframe = transformation2(dataframe)
dataframe = transformation3(dataframe)
dataframe = transformation4(dataframe)
Transformation function take the dask.dataframe and by using apply they add a new column to it like so:
def transformation1(dataframe):
dataframe['new_column'] = dataframe.apply(make_sequence)
return dataframe
Trying to compute using dask distributed and LocalCluster works correctly:
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True)
client = Client(cluster)
client.persist(dataframe)
But when opening dask-scheduler and dask-workers I get the following message
return pickle.loads(x) ImportError: No module named dask_test.helpers.datetime
EDIT:
Importing datetime module and cloudpickle it seems that datetime is pickable
from dask_test.helpers import datetime
import cloudpickle
cloudpickle.dumps(datetime) # this works
datetime_module = cloudpickle.loads(cloudpickle.dumps(datetime)) # this also works
EDIT: After some more investigating I saw this in the log files:
distributed.protocol.pickle - INFO - Failed to deserialize �cpandas.core.frame
DataFrame
In my main file, I first create pandas Dataframe and then use from_pandas
method to turn it into a dask DataFrame.
EDIT 3: I found what the issue is but I cannot understand why. In my datetime module I have defined an object called TimeWindow to deal with periods of time. My datajson file has a field with timestamps in a form of
timestamp_since-timestamp_until
I apply a function on the dataframe to change add a column that transforms the above to a time window object like this:
def convert_to_time_window(item):
since = my_datetime.utcfromtimestamp_tzaware(float(item.split('-')[0]))
until = my_datetime.utcfromtimestamp_tzaware(float(item.split('-')[1]))
return my_datetime.TimeWindow(tm_since=since, tm_until=until)
and on the Dataframe (this is a pandas dataframe. I do this before creating a dask dataframe)
dataframe['tw'] = dataframe['time_bucket'].apply(convert_to_time_window)
Without it workers work fine. But TimeWindow object and instances are clould pickleable
Upvotes: 2
Views: 2601
Reputation: 57319
It sounds like your dask-worker processes don't have access to the dask_test.helpers.datetime
module the way that your client process does. From how you describe your project it sounds like you are relying on running your Python process from the same location as your files. You might do either of the following:
distributed.protocol.pickle - INFO - Failed to deserialize �cpandas.core.frame
This error sounds like your dask-worker processes don't have access to Pandas. Generally your dask-worker processes need to have the same software environment everywhere.
To verify that all of your workers have the same current working directory, try running os.getcwd
on all of your workers
In [6]: client.run(os.getcwd)
Out[6]:
{'tcp://127.0.0.1:34115': '/home/foo',
'tcp://127.0.0.1:39449': '/home/foo',
'tcp://127.0.0.1:40322': '/home/foo',
'tcp://127.0.0.1:41050': '/home/foo'}
You can compare this to calling os.getcwd()
locally in your python process.
Upvotes: 2