Reputation: 1469
I just can't explain myself why dask is creating a process for each task in map_blocks. Here is my code:
EDIT: here is the minimal reproducible example. I run it on Ubuntu 18.04, but that should be neglectable:
import os
from multiprocessing.pool import ThreadPool
import dask
import dask.array as da
import h5py
import numpy as np
MY_USER_NAME = "myusername"
EARTH_RADIUS = 6372.795
CHUNK_SIZE = 5000
N = 20000
lat_vec = np.random.random(N) * 90
lon_vec = np.random.random(N) * 180
lat_vec = np.radians(lat_vec)
lon_vec = np.radians(lon_vec)
sin_lat_vec = np.sin(lat_vec)
cos_lat_vec = np.cos(lat_vec)
def _blocked_calculate_great_circle_distance(block, block_info=None):
loc = block_info[0]['array-location']
(row_start, row_stop) = loc[0]
(col_start, col_stop) = loc[1]
# see https://en.wikipedia.org/wiki/Great-circle_distance
# and https://github.com/ulope/geopy/blob/master/geopy/distance.py
row_lon = lon_vec[row_start:row_stop]
col_lon = lon_vec[col_start:col_stop]
delta_lon = row_lon[:, np.newaxis] - col_lon
cos_delta_lon = np.cos(delta_lon)
central_angle = np.arccos(sin_lat_vec[row_start:row_stop, np.newaxis] * sin_lat_vec[col_start:col_stop] +
cos_lat_vec[row_start:row_stop, np.newaxis] * cos_lat_vec[col_start:col_stop]
* cos_delta_lon)
return EARTH_RADIUS * central_angle
dir_path = "/home/" + MY_USER_NAME + "/minimum_reproducible_example/"
if not os.path.exists(dir_path):
os.makedirs(dir_path)
file_path = os.path.join(dir_path, "matrix.hdf5")
if os.path.exists(file_path):
os.remove(file_path)
with dask.config.set(pool=ThreadPool()):
with h5py.File(file_path) as f:
d_set = f.create_dataset('/data', shape=(N, N), dtype='f4', fillvalue=0)
w = da.from_array(d_set, chunks=(CHUNK_SIZE, CHUNK_SIZE))
w = w.map_blocks(_blocked_calculate_great_circle_distance, chunks=(CHUNK_SIZE, CHUNK_SIZE), dtype='f4')
da.store(w, d_set, dtype="float32")
There are 16 cores available, so I would like to use 16 threads to run the approximately 300 tasks (n² / CHUNK_SIZE²) in parallel. But when I do pstree -p, I see that there are several hundred python processes running. Dask creates so many processes, that the overall execution becomes very slow.
Can anybody help me with this?
Thanks!
Upvotes: 0
Views: 304
Reputation: 1469
Thanks to the comment of @Malbert. Yes, dask creates threads and not processes.
Upvotes: 0
Reputation: 57281
By default dask.array doesn't create any processes, it uses the threaded scheduler.
You can learn more about Dask's schedulers here: https://docs.dask.org/en/latest/scheduling.html
Upvotes: 1