Leo Wotzak
Leo Wotzak

Reputation: 194

Dask DataFrame Coiled KilledWorker read_sql

I'm trying to run a Dask cluster alongside a Dash app to analyze very large data sets. I'm able to run a LocalCluster successfully and the Dask DataFrame computations occur successfully. The Dash app is started using the following gunicorn command:

Unfortunately, my issues occur when I try and move the cluster to coiled.

coiled.create_software_environment(
    name="my-conda-env",
    conda={
        "channels": ["conda-forge", "defaults"],
        "dependencies": ["dask", "dash"],
    },
)

coiled.create_cluster_configuration(
    name="my-cluster-config",
    scheduler_cpu=1,
    scheduler_memory="1 GiB",
    worker_cpu=2,
    worker_memory="1 GiB",
    software="my-conda-env"
)

cluster = coiled.Cluster(n_workers=2)
CLIENT = Client(cluster)

dd_bills_df = dd.read_sql_table(
    table, conn_string, npartitions=10, index_col='DB_BillID'
)
CLIENT.publish_dataset(bills=dd_bills_df)
del dd_bills_df

log.debug(CLIENT.list_datasets())

x = CLIENT.get_dataset('bills').persist()
log.debug(x.groupby('BillType').count().compute())

The cluster is created and the data set is successfully published to the cluster and then the dataset is successfully pulled by the client into the variable x. The problem occurs during the groupby() calculation.

[2021-12-03 17:40:30 -0600] [78928] [ERROR] Exception in worker process
Traceback (most recent call last):
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/arbiter.py", line 589, in spawn_worker
    worker.init_process()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 134, in init_process
    self.load_wsgi()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 146, in load_wsgi
    self.wsgi = self.app.wsgi()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/base.py", line 67, in wsgi
    self.callable = self.load()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 58, in load
    return self.load_wsgiapp()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 48, in load_wsgiapp
    return util.import_app(self.app_uri)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/util.py", line 359, in import_app
    mod = importlib.import_module(module)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 855, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/Users/leowotzak/PenHole/test-containers2/src/application.py", line 61, in <module>
    log.debug(x.groupby('BillType').count().compute())
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 288, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 571, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 2725, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1980, in gather
    return self.sync(
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 868, in sync
    return sync(
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 332, in sync
    raise exc.with_traceback(tb)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 315, in f
    result[0] = yield future
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1845, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('_read_sql_chunk-5519d13b-b80d-468e-afd5-5f072b9adbec', <WorkerState 'tls://10.4.27.61:40673', name: coiled-dask-leowotzc2-75566-worker-6a0538671d, status: closed, memory: 0, processing: 10>)

This is the log output prior to the crash:

DEBUG:application:Dask DataFrame Structure:
               BillName BillType ByRequest Congress EnactedAs    IntroducedAt BillNumber OfficialTitle PopularTitle ShortTitle CurrentStatus BillSubjectTopTerm     URL TextURL  DB_LastModDate  DB_CreatedDate
npartitions=10                                                                                                                                                                                                 
1.0              object   object     int64   object    object  datetime64[ns]     object        object       object     object        object             object  object  object  datetime64[ns]  datetime64[ns]
2739.9              ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
...                 ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
24651.1             ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
27390.0             ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
Dask Name: from-delayed, 20 tasks
DEBUG:application:('bills',)

I have tried increasing the memory allocated to each worker and the number of partitions in the Dask DataFrame to no avail. I'm struggling to figure out what is killing the workers, has anyone else run into this error?

Upvotes: 0

Views: 239

Answers (3)

Leo Wotzak
Leo Wotzak

Reputation: 194

Solution

The source of the error stems from misconfigured dask-worker and dask-scheduler software environments, unrelated to coiled and the code sample in the original post.

The dask-scheduler and dask-worker processes were running in docker containers on EC2 instances. To initialize these processes, the following command was used:

sudo docker run -it --net=host daskdev/dask:latest dask-worker <host>:<port>

daskdev/dask is defined as such in the documentation:

This a normal debian + miniconda image with the full Dask conda package (including the distributed scheduler), Numpy, and Pandas. This image is about 1GB in size.

The problem is, dask.dataframe.read_sql_table(...) utilizes sqlalchemy, and by extension a database drive, such as pymysql. These are not included in this base image. To solve this, the previous docker run command can be ammended with the following:

sudo docker run -it -e EXTRA_PIP_PACKAGES="sqlalchemy pymysql" --net=host daskdev/dask:latest dask-worker <host>:<port>

Upvotes: 1

Michael Delgado
Michael Delgado

Reputation: 15442

When doing groupby operations with large results, you can try the following:

Upvotes: 0

SultanOrazbayev
SultanOrazbayev

Reputation: 16561

If the dataset is very large, setting 1GB to workers and scheduler might be very constraining. There are two options to try:

  1. Set memory of worker and scheduler to a level comparable to your local machine.

  2. Try the coiled version of the code on a fairly small subset of the table.

Upvotes: 0

Related Questions