Reputation: 1295
I need to save a dask array from a big numpy array. Below there is a minimum working example that’s show the process. Note that a
is created with numpy.random
only for this mwe, unfortunately I can not create the array with dask.
import numpy as np
import dask.array as da
from dask.distributed import Client
a = numpy.random.randint(0,2,size=4000000*8000).reshape((4000000,8000))
# here the conversion and saving
client = Client(n_workers=90, threads_per_worker=20, processes=True)
dask_array = da.from_array( a, chunks = 100000)
da.to_npy_stack(‘my-folder/’, dask_array)
client.close()
The problem that I am facing is that a
in memory takes around 100GB, however when running the dask part the memory used starts going up until it almost full the available ram, that is more than 300GB. Then it does some computing and I got a memory error after some time (like 10 minutes). I need the array saved by dask as I have another pipeline (which cannot be connected directly to this pipeline) that’s using dask arrays and to read a dask array from memory the info
file is required (if there is any other method to dump the array and create the info
file I am open to try it).
Any suggestions on how to speed up and solve this task?
Upvotes: 0
Views: 1188
Reputation: 57251
If you are on a single machine then I recommend using the standard threaded scheduler rather than the dask.distributed.Client
. You will keep all of the data in the same process this way, and remove the need to make copies of your large Numpy array.
Upvotes: 1
Reputation: 28673
Creating all of your data in the main process and then uploading it to the worker processes is a bad idea! You should always endeavour to load/create the data directly in the workers, which will a) avoid repeating work and copying data and b) keep the data lazy, only materialising it into memory at need:
In this case, this might look like
arr = da.random.randint(0, 2, size=4000000*8000, chunks=100000).reshape((4000000,8000))
Upvotes: 1