rpanai
rpanai

Reputation: 13437

Dask - Is it possible to use all threads in every worker with custom function?

In my case I have several files in S3 and a custom function that read each one of them and process it using all threads. To simplify the example I just generate a dataframe df and I assume that my function is tsfresh.extract_features which use multiprocessing.

Generate Data

import pandas as pd
from tsfresh import extract_features
from tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \
load_robot_execution_failures
download_robot_execution_failures()
ts, y = load_robot_execution_failures()
df = []
for i in range(5):
    tts = ts.copy()
    tts["id"] += 88 * i
    df.append(tts)
    
df = pd.concat(df, ignore_index=True)

Function

def fun(df, n_jobs):
    extracted_features = extract_features(df,
                                      column_id="id",
                                      column_sort="time",
                                      n_jobs=n_jobs)

Cluster

import dask
from dask.distributed import Client, progress
from dask import compute, delayed
from dask_cloudprovider import FargateCluster

my_vpc = # your vpc
my_subnets = # your subnets

cpu = 2 
ram = 4
cluster = FargateCluster(n_workers=1,
                         image='rpanai/feats-worker:2020-08-24',
                         vpc=my_vpc,
                         subnets=my_subnets,
                         worker_cpu=int(cpu * 1024),
                         worker_mem=int(ram * 1024),
                         cloudwatch_logs_group="my_log_group",
                         task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],
                         scheduler_timeout='20 minutes'
                        )


cluster.adapt(minimum=1,
              maximum=4)
client = Client(cluster)
client

Using all worker threads (FAIL)

to_process = [delayed(fun)(df, cpu) for i in range(10)]
out = compute(to_process)
AssertionError: daemonic processes are not allowed to have children

Using only one thread (OK)

In this case it works fine but I'm wasting resources.

to_process = [delayed(fun)(df, 0) for i in range(10)]
out = compute(to_process)

Question

I know that for this particular function I could eventually write a custom distributor using multithreading and few other tricks but I'd like to distribute a job where on every worker I can take advantages of all resources without having to worry too much.

Update

The function was just an example and actually it has some sort of cleaning before the actual feature extraction and after it save it to S3.

def fun(filename, bucket_name, filename_out, n_jobs):
    #
    df pd.read_parquet(f"s3://{bucket_name}/{filename}")
    # do some cleaning
    extracted_features = extract_features(df,
                                      column_id="id",
                                      column_sort="time",
                                      n_jobs=n_jobs)
   extract_features.to_parquet(f"s3://{bucket_name}/{filename_out}")

Upvotes: 2

Views: 625

Answers (1)

nilpferd1991
nilpferd1991

Reputation: 166

I can help answering your specific question for tsfresh, but if tsfresh was just a simple toy example that might not be what you want.

For tsfresh, you would typically not mix the multiprocessing of tsfresh and dask, but let dask do all the handling. This means, you start with a single dask.DataFrame (in your test case, you could just convert the pandas dataframe into a dask one - for your read use case you can read directly from S3 docu), and then distribute the feature extraction in the dask dataframe (the nice thing on the feature extraction is, that it works independently on every time series. Therefore we can generate a single job for every time series).

The current version of tsfresh (0.16.0) has a small helper function that will do this for you: see here. In the next version, it might even be possible to just run extract_features on the dask dataframe directly.

I am not sure if this helps to solve your more general question. In my opinion, you (in most of the cases) do not want to mix dask's distribution function and "local" multicore calculation but just let dask handle everything. Because if you are on a dask cluster, you might not even know how many cores you will have on each of the machines (or you might only get a single one per job).

This means if your job can be distributed N times and each of them will start M sub-jobs, you just give "N x M" jobs to dask and let it figure out the rest (including data locality).

Upvotes: 2

Related Questions