Guido Muscioni
Guido Muscioni

Reputation: 1295

Dask and numpy - slow conversion between numpy array and dask array

I need to save a dask array from a big numpy array. Below there is a minimum working example that’s show the process. Note that a is created with numpy.random only for this mwe, unfortunately I can not create the array with dask.

import numpy as np
import dask.array as da
from dask.distributed import Client

a = numpy.random.randint(0,2,size=4000000*8000).reshape((4000000,8000))
# here the conversion and saving
client = Client(n_workers=90, threads_per_worker=20, processes=True)
dask_array = da.from_array( a, chunks = 100000)
da.to_npy_stack(‘my-folder/’, dask_array)
client.close()

The problem that I am facing is that a in memory takes around 100GB, however when running the dask part the memory used starts going up until it almost full the available ram, that is more than 300GB. Then it does some computing and I got a memory error after some time (like 10 minutes). I need the array saved by dask as I have another pipeline (which cannot be connected directly to this pipeline) that’s using dask arrays and to read a dask array from memory the info file is required (if there is any other method to dump the array and create the info file I am open to try it).

Any suggestions on how to speed up and solve this task?

Upvotes: 0

Views: 1188

Answers (2)

MRocklin
MRocklin

Reputation: 57251

If you are on a single machine then I recommend using the standard threaded scheduler rather than the dask.distributed.Client. You will keep all of the data in the same process this way, and remove the need to make copies of your large Numpy array.

Upvotes: 1

mdurant
mdurant

Reputation: 28673

Creating all of your data in the main process and then uploading it to the worker processes is a bad idea! You should always endeavour to load/create the data directly in the workers, which will a) avoid repeating work and copying data and b) keep the data lazy, only materialising it into memory at need:

In this case, this might look like

arr = da.random.randint(0, 2, size=4000000*8000, chunks=100000).reshape((4000000,8000))

Upvotes: 1

Related Questions