Reputation: 13437
In my case I have several files in S3 and a custom function that read each one of them and process it using all threads. To simplify the example I just generate a dataframe df
and I assume that my function is tsfresh.extract_features
which use multiprocessing.
import pandas as pd
from tsfresh import extract_features
from tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \
load_robot_execution_failures
download_robot_execution_failures()
ts, y = load_robot_execution_failures()
df = []
for i in range(5):
tts = ts.copy()
tts["id"] += 88 * i
df.append(tts)
df = pd.concat(df, ignore_index=True)
def fun(df, n_jobs):
extracted_features = extract_features(df,
column_id="id",
column_sort="time",
n_jobs=n_jobs)
import dask
from dask.distributed import Client, progress
from dask import compute, delayed
from dask_cloudprovider import FargateCluster
my_vpc = # your vpc
my_subnets = # your subnets
cpu = 2
ram = 4
cluster = FargateCluster(n_workers=1,
image='rpanai/feats-worker:2020-08-24',
vpc=my_vpc,
subnets=my_subnets,
worker_cpu=int(cpu * 1024),
worker_mem=int(ram * 1024),
cloudwatch_logs_group="my_log_group",
task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],
scheduler_timeout='20 minutes'
)
cluster.adapt(minimum=1,
maximum=4)
client = Client(cluster)
client
to_process = [delayed(fun)(df, cpu) for i in range(10)]
out = compute(to_process)
AssertionError: daemonic processes are not allowed to have children
In this case it works fine but I'm wasting resources.
to_process = [delayed(fun)(df, 0) for i in range(10)]
out = compute(to_process)
I know that for this particular function I could eventually write a custom distributor using multithreading and few other tricks but I'd like to distribute a job where on every worker I can take advantages of all resources without having to worry too much.
The function was just an example and actually it has some sort of cleaning before the actual feature extraction and after it save it to S3
.
def fun(filename, bucket_name, filename_out, n_jobs):
#
df pd.read_parquet(f"s3://{bucket_name}/{filename}")
# do some cleaning
extracted_features = extract_features(df,
column_id="id",
column_sort="time",
n_jobs=n_jobs)
extract_features.to_parquet(f"s3://{bucket_name}/{filename_out}")
Upvotes: 2
Views: 625
Reputation: 166
I can help answering your specific question for tsfresh
, but if tsfresh
was just a simple toy example that might not be what you want.
For tsfresh
, you would typically not mix the multiprocessing of tsfresh
and dask, but let dask do all the handling. This means, you start with a single dask.DataFrame
(in your test case, you could just convert the pandas dataframe into a dask one - for your read use case you can read directly from S3
docu), and then distribute the feature extraction in the dask dataframe (the nice thing on the feature extraction is, that it works independently on every time series. Therefore we can generate a single job for every time series).
The current version of tsfresh
(0.16.0) has a small helper function that will do this for you: see here.
In the next version, it might even be possible to just run extract_features
on the dask dataframe directly.
I am not sure if this helps to solve your more general question. In my opinion, you (in most of the cases) do not want to mix dask's distribution function and "local" multicore calculation but just let dask handle everything. Because if you are on a dask cluster, you might not even know how many cores you will have on each of the machines (or you might only get a single one per job).
This means if your job can be distributed N times and each of them will start M sub-jobs, you just give "N x M" jobs to dask and let it figure out the rest (including data locality).
Upvotes: 2