SultanOrazbayev
SultanOrazbayev

Reputation: 16581

If `prefect` task failed, is it possible to re-run it with different `dask` parameters?

Consider a prefect task whose memory requirements are not known in advance. If the task fails because the worker does not have sufficient memory, is it possible to modify the dask worker parameters and re-run the task?

It would be great if there's a way to increase memory allocation per worker by some value after each failure.

Upvotes: 0

Views: 608

Answers (1)

Anna Geller
Anna Geller

Reputation: 1758

It's hard to give a general answer because it depends on your infrastructure.

  1. For instance, if you want to provide custom keyword arguments to a Dask cluster_class ad-hoc per flow run, you could pass a dynamic function to a DaskExecutor's cluster_class. This function could retrieve values such as n_workers from a Parameter task, as follows:
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor

def dynamic_executor():
    from distributed import LocalCluster

    # could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
    return LocalCluster(n_workers=prefect.context.parameters["n_workers"])

with Flow(
    "dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
) as flow:
    flow.add_task(Parameter("n_workers", default=5))

This means that you could start a new flow run with a different value of n_workers defined ad-hoc.

  1. The second option would be to assign more memory in your run configuration on a per-flow-run basis - e.g. you could overwrite the memory_request set on a KubernetesRun from a UI:
with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["k8s"],
            cpu_request=0.5,
            memory_request="2Gi",
        ),
) as flow:

The above code snippet defines 2 GB, but if you notice your flow run ended with an OOM error and you need more, you could trigger a new flow run from a UI with a higher memory request.

  1. The last option would be to override the executor values directly in your flow definition:
import coiled
from prefect.executors import DaskExecutor

flow.executor = DaskExecutor(
    cluster_class=coiled.Cluster,
    cluster_kwargs={
        "software": "user/software_env_name",
        "shutdown_on_close": True,
        "name": "prefect-cluster",
        "scheduler_memory": "4 GiB",
        "worker_memory": "8 GiB",
    },
)

As long as you use script storage (e.g. one of Git storage classes such as GitHub, Git, Gitlab, Bitbucket, etc) rather than pickle storage, and you commit your code with a modified value of worker_memory, this should be reflected in your new flow run because metadata about the executor is not stored on a backend - it's retrieved from your flow storage.

Upvotes: 2

Related Questions