Reputation: 878
I am using dask to handle data from the variations of many many parameters where I aim to build a final dask dataframe of 600 000 ( number of cases or columns) from operations on dask arrays constructed from small arrays of shape less than 2000. Here, my final dataframe computed for 6400 cases
dd_final.compute()
0_95_euclidean 1_95_euclidean 2_95_euclidean 3_95_euclidean ... 96_80_l1 97_80_l1 98_80_l1 99_80_l1
0 0.005670 0.010449 0.010756 0.009914 ... 0.007422 0.002066 0.009693 0.003475
1 0.006255 0.009970 0.007987 0.007785 ... 0.006119 0.002104 0.009638 0.004142
2 0.011956 0.018662 0.016426 0.015260 ... 0.013276 0.003897 0.019816 0.007479
3 0.021639 0.037590 0.036749 0.028090 ... 0.029751 0.009725 0.038956 0.011870
4 0.014482 0.022963 0.025416 0.017909 ... 0.017033 -0.002616 0.026231 0.000978
... ... ... ... ... ... ... ... ... ...
1289 0.597443 1.044522 0.898732 0.940219 ... 0.914094 0.792133 0.744501 0.632575
1290 0.594463 1.041562 0.894501 0.935068 ... 0.913409 0.790555 0.742357 0.628366
1291 0.592523 1.035600 0.891222 0.932510 ... 0.907414 0.786722 0.738844 0.627611
1292 0.606415 1.059642 0.912963 0.951523 ... 0.922719 0.800610 0.751161 0.640515
1293 0.601242 1.049654 0.903112 0.942681 ... 0.915391 0.794133 0.744752 0.636788
[1294 rows x 6400 columns]
First Approach: I am using pool starmap for every function to accelerate operations with 8 cores CPU and put the results into a dask array.
def MP_a_func(func,iterable,proc,chunk):
#
pool=multiprocessing.Pool(processes=proc)
Result=pool.starmap_async(func,iterable,chunksize=chunk)
#
return Result
if __name__ == '__main__':
performances=MP_a_func(Post_processing_Weights,iterable,proc,chunk)
da_arr=da.from_array(performances.get(),chunks=chunk)
#... Some operations
#...
dd_final=dd.from_dask_array(da_arr).repartition(chunk)
This approach fails because memory is not enough to store the MP object before storing into dask array.
Second Approach: I would like to use pool starmap but with slicing my iterable and appending at each slice the dask array
for iterable in [iter1,iter2,...,iter10000]:
if __name__ == '__main__':
performances=MP_a_func(Post_processing_Weights,iterable,proc,chunk)
partial_da_arr=da.from_array(performances.get(),chunks=chunk)
# append or assign to da_arr ??
How can I use append at each step or assign to dask array without loading memory or is there a better approach ?
Thank you for your help
Upvotes: 1
Views: 265
Reputation: 16581
600 000 ( number of cases or columns)
Judging by the names of the columns you provide in the sample, your workflow might benefit from a better re-organization of the data, which might also simplify the calculations.
How can I use append at each step or assign to dask array without loading memory or is there a better approach ?
You are probably interested in delayed
API, but the question/problem is not sufficiently clear to provide further advice.
Upvotes: 1