Andy R
Andy R

Reputation: 1469

Dask creates as many processes as chunks in dask.array.map_blocks

I just can't explain myself why dask is creating a process for each task in map_blocks. Here is my code:

EDIT: here is the minimal reproducible example. I run it on Ubuntu 18.04, but that should be neglectable:

import os
from multiprocessing.pool import ThreadPool

import dask
import dask.array as da
import h5py
import numpy as np

MY_USER_NAME = "myusername"
EARTH_RADIUS = 6372.795
CHUNK_SIZE = 5000
N = 20000

lat_vec = np.random.random(N) * 90
lon_vec = np.random.random(N) * 180
lat_vec = np.radians(lat_vec)
lon_vec = np.radians(lon_vec)
sin_lat_vec = np.sin(lat_vec)
cos_lat_vec = np.cos(lat_vec)


def _blocked_calculate_great_circle_distance(block, block_info=None):
    loc = block_info[0]['array-location']
    (row_start, row_stop) = loc[0]
    (col_start, col_stop) = loc[1]

    # see https://en.wikipedia.org/wiki/Great-circle_distance
    # and https://github.com/ulope/geopy/blob/master/geopy/distance.py
    row_lon = lon_vec[row_start:row_stop]
    col_lon = lon_vec[col_start:col_stop]

    delta_lon = row_lon[:, np.newaxis] - col_lon
    cos_delta_lon = np.cos(delta_lon)
    central_angle = np.arccos(sin_lat_vec[row_start:row_stop, np.newaxis] * sin_lat_vec[col_start:col_stop] +
                              cos_lat_vec[row_start:row_stop, np.newaxis] * cos_lat_vec[col_start:col_stop]
                              * cos_delta_lon)

    return EARTH_RADIUS * central_angle


dir_path = "/home/" + MY_USER_NAME + "/minimum_reproducible_example/"
if not os.path.exists(dir_path):
    os.makedirs(dir_path)

file_path = os.path.join(dir_path, "matrix.hdf5")
if os.path.exists(file_path):
    os.remove(file_path)

with dask.config.set(pool=ThreadPool()):
    with h5py.File(file_path) as f:
        d_set = f.create_dataset('/data', shape=(N, N), dtype='f4', fillvalue=0)

        w = da.from_array(d_set, chunks=(CHUNK_SIZE, CHUNK_SIZE))

        w = w.map_blocks(_blocked_calculate_great_circle_distance, chunks=(CHUNK_SIZE, CHUNK_SIZE), dtype='f4')

        da.store(w, d_set, dtype="float32")

There are 16 cores available, so I would like to use 16 threads to run the approximately 300 tasks (n² / CHUNK_SIZE²) in parallel. But when I do pstree -p, I see that there are several hundred python processes running. Dask creates so many processes, that the overall execution becomes very slow.

Can anybody help me with this?

Thanks!

Upvotes: 0

Views: 304

Answers (2)

Andy R
Andy R

Reputation: 1469

Thanks to the comment of @Malbert. Yes, dask creates threads and not processes.

enter image description here

Upvotes: 0

MRocklin
MRocklin

Reputation: 57281

By default dask.array doesn't create any processes, it uses the threaded scheduler.

You can learn more about Dask's schedulers here: https://docs.dask.org/en/latest/scheduling.html

Upvotes: 1

Related Questions