mathdugre
mathdugre

Reputation: 98

Why does dask worker fails due to MemoryError on "small" size task? [Dask.bag]

I am running a pipeline on multiple images. The pipeline consist of reading the images from file system, doing so processing on each of them, then saving the images to file system. However the dask worker fails due to MemoryError. Is there a way to assure the dask workers don't load too many images in memory? i.e. Wait until there is enough space on a worker before starting the processing pipeline on a new image.

I have one scheduler and 40 workers with 4 cores, 15GB ram and running Centos7. I am trying to process 125 images in a batch; each image is fairly large but small enough to fit on a worker; around 3GB require for the whole process.

I tried to process a smaller amount of images and it works great.

EDITED

from dask.distributed import Client, LocalCluster

# LocalCluster is used to show the config of the workers on the actual cluster
client = Client(LocalCluster(n_workers=2, resources={'process': 1}))

paths = ['list', 'of', 'paths']

# Read the file data from each path
data = client.map(read, path, resources={'process': 1)

# Apply foo to the data n times
for _ in range(n):
    data = client.map(foo, x, resources={'process': 1)

# Save the processed data
data.map(save, x, resources={'process': 1)

# Retrieve results
client.gather(data)

I expected the images to be process as space was available on the workers but it seems like the images are all loaded simultaneously on the different workers.

EDIT: My issues is that all task get assigned to workers and they don't have enough memory. I found how to limit the number of task a worker handle at a single moment [https://distributed.readthedocs.io/en/latest/resources.html#resources-are-applied-separately-to-each-worker-process](see here). However, with that limit, when I execute my task they all finish the read step, then the process step and finally the save step. This is an issues since the image are spilled to disk.

Would there be a way to make every task finish before starting a new one? e.g. on Worker-1: read(img1)->process(img1)->save(img1)->read(img2)->...

Upvotes: 2

Views: 734

Answers (1)

mdurant
mdurant

Reputation: 28684

Dask does not generally know how much memory a task will need, it can only know the size of the outputs, and that, only once they are finished. This is because Dask simply executes a pthon function and then waits for it to complete; but all osrts of things can happen within a python function. You should generally expect as many tasks to begin as you have available worker cores - as you are finding.

If you want a smaller total memory load, then your solution should be simple: have a small enough number of workers, so that if all of them are using the maximum memory that you can expect, you still have some spare in the system to cope.

To EDIT: you may want to try running optimize on the graph before submission (although this should happen anyway, I think), as it sounds like your linear chains of tasks should be "fused". http://docs.dask.org/en/latest/optimize.html

Upvotes: 1

Related Questions