Reputation: 16581
Consider a prefect
task whose memory requirements are not known in advance. If the task fails because the worker does not have sufficient memory, is it possible to modify the dask
worker parameters and re-run the task?
It would be great if there's a way to increase memory allocation per worker by some value after each failure.
Upvotes: 0
Views: 608
Reputation: 1758
It's hard to give a general answer because it depends on your infrastructure.
cluster_class
ad-hoc per flow run, you could pass a dynamic function to a DaskExecutor
's cluster_class
. This function could retrieve values such as n_workers
from a Parameter
task, as follows:import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor
def dynamic_executor():
from distributed import LocalCluster
# could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
return LocalCluster(n_workers=prefect.context.parameters["n_workers"])
with Flow(
"dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
) as flow:
flow.add_task(Parameter("n_workers", default=5))
This means that you could start a new flow run with a different value of n_workers
defined ad-hoc.
memory_request
set on a KubernetesRun
from a UI:with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["k8s"],
cpu_request=0.5,
memory_request="2Gi",
),
) as flow:
The above code snippet defines 2 GB, but if you notice your flow run ended with an OOM error and you need more, you could trigger a new flow run from a UI with a higher memory request.
import coiled
from prefect.executors import DaskExecutor
flow.executor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"software": "user/software_env_name",
"shutdown_on_close": True,
"name": "prefect-cluster",
"scheduler_memory": "4 GiB",
"worker_memory": "8 GiB",
},
)
As long as you use script storage (e.g. one of Git storage classes such as GitHub, Git, Gitlab, Bitbucket, etc) rather than pickle storage, and you commit your code with a modified value of worker_memory
, this should be reflected in your new flow run because metadata about the executor is not stored on a backend - it's retrieved from your flow storage.
Upvotes: 2