Reputation: 4255
I have a long flow and I would like to mark its state as Failed
if a specific task fails.
I read the docs here, however they don't seem to specify such a scenario.
My guess is that I need to specific this somehow in the @task()
decorator, but I don't know how.
Any guidance is much appreciated.
Upvotes: 1
Views: 806
Reputation: 1400
This can be achieved by declaring your specific task a "reference task" of your Flow. Reference tasks are the tasks that ultimately determine the final state for each flow run.
For example, the following code snippet creates a flow with two independent, unrelated tasks that random fail half the time. However, the overall flow run state will only be determined based on the state of task_one
, as we specify that task as the sole reference task:
import random
from prefect import task, Flow
@task
def task_one():
if random.random() > 0.5:
raise ValueError("Random failure")
@task
def task_two():
if random.random() > 0.5:
raise ValueError("Random failure")
flow = Flow("Two Task Flow", tasks=[task_one, task_two])
flow.set_reference_tasks([task_one])
Upvotes: 1