Reputation: 194
I'm trying to run a Dask
cluster alongside a Dash
app to analyze very large data sets. I'm able to run a LocalCluster
successfully and the Dask DataFrame computations occur successfully. The Dash
app is started using the following gunicorn
command:
Unfortunately, my issues occur when I try and move the cluster to coiled
.
coiled.create_software_environment(
name="my-conda-env",
conda={
"channels": ["conda-forge", "defaults"],
"dependencies": ["dask", "dash"],
},
)
coiled.create_cluster_configuration(
name="my-cluster-config",
scheduler_cpu=1,
scheduler_memory="1 GiB",
worker_cpu=2,
worker_memory="1 GiB",
software="my-conda-env"
)
cluster = coiled.Cluster(n_workers=2)
CLIENT = Client(cluster)
dd_bills_df = dd.read_sql_table(
table, conn_string, npartitions=10, index_col='DB_BillID'
)
CLIENT.publish_dataset(bills=dd_bills_df)
del dd_bills_df
log.debug(CLIENT.list_datasets())
x = CLIENT.get_dataset('bills').persist()
log.debug(x.groupby('BillType').count().compute())
The cluster is created and the data set is successfully published to the cluster and then the dataset is successfully pulled by the client into the variable x
. The problem occurs during the groupby()
calculation.
[2021-12-03 17:40:30 -0600] [78928] [ERROR] Exception in worker process
Traceback (most recent call last):
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/arbiter.py", line 589, in spawn_worker
worker.init_process()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 134, in init_process
self.load_wsgi()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 146, in load_wsgi
self.wsgi = self.app.wsgi()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/base.py", line 67, in wsgi
self.callable = self.load()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 58, in load
return self.load_wsgiapp()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 48, in load_wsgiapp
return util.import_app(self.app_uri)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/util.py", line 359, in import_app
mod = importlib.import_module(module)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 855, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "/Users/leowotzak/PenHole/test-containers2/src/application.py", line 61, in <module>
log.debug(x.groupby('BillType').count().compute())
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 288, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 571, in compute
results = schedule(dsk, keys, **kwargs)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 2725, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1980, in gather
return self.sync(
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 868, in sync
return sync(
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 332, in sync
raise exc.with_traceback(tb)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 315, in f
result[0] = yield future
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1845, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('_read_sql_chunk-5519d13b-b80d-468e-afd5-5f072b9adbec', <WorkerState 'tls://10.4.27.61:40673', name: coiled-dask-leowotzc2-75566-worker-6a0538671d, status: closed, memory: 0, processing: 10>)
This is the log output prior to the crash:
DEBUG:application:Dask DataFrame Structure:
BillName BillType ByRequest Congress EnactedAs IntroducedAt BillNumber OfficialTitle PopularTitle ShortTitle CurrentStatus BillSubjectTopTerm URL TextURL DB_LastModDate DB_CreatedDate
npartitions=10
1.0 object object int64 object object datetime64[ns] object object object object object object object object datetime64[ns] datetime64[ns]
2739.9 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
24651.1 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
27390.0 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: from-delayed, 20 tasks
DEBUG:application:('bills',)
I have tried increasing the memory allocated to each worker and the number of partitions in the Dask DataFrame to no avail. I'm struggling to figure out what is killing the workers, has anyone else run into this error?
Upvotes: 0
Views: 239
Reputation: 194
The source of the error stems from misconfigured dask-worker
and dask-scheduler
software environments, unrelated to coiled
and the code sample in the original post.
The dask-scheduler
and dask-worker
processes were running in docker
containers on EC2 instances. To initialize these processes, the following command was used:
sudo docker run -it --net=host daskdev/dask:latest dask-worker <host>:<port>
daskdev/dask
is defined as such in the documentation:
This a normal debian + miniconda image with the full Dask conda package (including the distributed scheduler), Numpy, and Pandas. This image is about 1GB in size.
The problem is, dask.dataframe.read_sql_table(...)
utilizes sqlalchemy
, and by extension a database drive, such as pymysql
. These are not included in this base image. To solve this, the previous docker run
command can be ammended with the following:
sudo docker run -it -e EXTRA_PIP_PACKAGES="sqlalchemy pymysql" --net=host daskdev/dask:latest dask-worker <host>:<port>
Upvotes: 1
Reputation: 15442
When doing groupby operations with large results, you can try the following:
observed=True
so that only categories appearing in each group will be present in the results. This is a quirk of pandas groupby operations but can really blow results up in dask.dataframes. see related: Avoiding Memory Issues For GroupBy on Large Pandas DataFramesplit_out=True
in your aggregation call if supported, e.g. df.groupby(large_set).mean(split_out=True)
. By default, the result of groupby operations will return a single partition - split out is significantly slower but won't blow up your memory. See related: dask dataframe groupby resulting in one partition memory issuedf.map_partitions
Upvotes: 0
Reputation: 16561
If the dataset is very large, setting 1GB to workers and scheduler might be very constraining. There are two options to try:
Set memory of worker and scheduler to a level comparable to your local machine.
Try the coiled
version of the code on a fairly small subset of the table.
Upvotes: 0