Apostolos
Apostolos

Reputation: 8121

Unable to load a module in dask worker

I have the following project tree

.
└── src
    └── dask_test
        ├── helpers
        │   ├── commandline.py
        │   ├── data
        │   │   ├── dataset0.json
        │   │   ├── dataset1000.json
        │   │   ├── dataset300.json
        │   │   ├── dataset5000.json
        │   │   ├── dataset500.json
        │   │   ├── events_to_be_used_final_without_google.nl.json
        │   │   ├── http-malware_modified.log
        │   │   └── public_suffix_list.json
        │   ├── datetime.py
        │   ├── datetime.pyc
        │   ├── __init__.py
        │   ├── __init__.pyc
        │   ├── math.py
        │   ├── math.pyc
        │   ├── pipeline.py
        │   ├── queues.py
        │   ├── search.py
        │   ├── services.py
        │   ├── sklearn.py
        │   ├── splunk_format.py
        │   ├── splunk.py
        │   └── sqlalchemy.py
        ├── __init__.py
        ├── __init__.pyc
        ├── main.py
        └── riskanalysis
            ├── iaccess
            │   ├── __init__.py
            │   └── metrics
            │       ├── base.py
            │       ├── __init__.py
            │       └── profile
            │           └── __init__.py
            ├── __init__.py
            └── metrics
                └── __init__.py

In the beginning of my main.py I import an object from `dask_test.helpers.datetime' like this

from dask_test.helpers.datetime import Timewindow

to use down in my main. In my main file, I have defined some function and apply them on a dask Dataframe like this

dataframe = transformation1(dataframe)
dataframe = transformation2(dataframe)
dataframe = transformation3(dataframe)
dataframe = transformation4(dataframe)

Transformation function take the dask.dataframe and by using apply they add a new column to it like so:

def transformation1(dataframe):
    dataframe['new_column'] = dataframe.apply(make_sequence)
    return dataframe

Trying to compute using dask distributed and LocalCluster works correctly:

from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True)
client = Client(cluster)
client.persist(dataframe)

But when opening dask-scheduler and dask-workers I get the following message

return pickle.loads(x) ImportError: No module named dask_test.helpers.datetime

  1. Fist question does not LocalCluster use pickling?
  2. All modules need to contain pickleable objects in order to work with dask distributed correctly?

EDIT:

Importing datetime module and cloudpickle it seems that datetime is pickable

from dask_test.helpers import datetime
import cloudpickle

cloudpickle.dumps(datetime)  # this works
datetime_module = cloudpickle.loads(cloudpickle.dumps(datetime)) # this also works

EDIT: After some more investigating I saw this in the log files:

distributed.protocol.pickle - INFO - Failed to deserialize �cpandas.core.frame

DataFrame

In my main file, I first create pandas Dataframe and then use from_pandas method to turn it into a dask DataFrame.

EDIT 3: I found what the issue is but I cannot understand why. In my datetime module I have defined an object called TimeWindow to deal with periods of time. My datajson file has a field with timestamps in a form of

timestamp_since-timestamp_until

I apply a function on the dataframe to change add a column that transforms the above to a time window object like this:

def convert_to_time_window(item):
    since = my_datetime.utcfromtimestamp_tzaware(float(item.split('-')[0]))
    until = my_datetime.utcfromtimestamp_tzaware(float(item.split('-')[1]))

    return my_datetime.TimeWindow(tm_since=since, tm_until=until)

and on the Dataframe (this is a pandas dataframe. I do this before creating a dask dataframe)

    dataframe['tw'] = dataframe['time_bucket'].apply(convert_to_time_window)

Without it workers work fine. But TimeWindow object and instances are clould pickleable

Upvotes: 2

Views: 2601

Answers (1)

MRocklin
MRocklin

Reputation: 57319

It sounds like your dask-worker processes don't have access to the dask_test.helpers.datetime module the way that your client process does. From how you describe your project it sounds like you are relying on running your Python process from the same location as your files. You might do either of the following:

  1. Install your software as a proper Python module (see Python docs for more information)
  2. Run your dask-worker processes from the same directory as your client process

distributed.protocol.pickle - INFO - Failed to deserialize �cpandas.core.frame

This error sounds like your dask-worker processes don't have access to Pandas. Generally your dask-worker processes need to have the same software environment everywhere.

Verify uniform current working directory

To verify that all of your workers have the same current working directory, try running os.getcwd on all of your workers

In [6]: client.run(os.getcwd)
Out[6]: 
{'tcp://127.0.0.1:34115': '/home/foo',
 'tcp://127.0.0.1:39449': '/home/foo',
 'tcp://127.0.0.1:40322': '/home/foo',
 'tcp://127.0.0.1:41050': '/home/foo'}

You can compare this to calling os.getcwd() locally in your python process.

Upvotes: 2

Related Questions