imarevic
imarevic

Reputation: 59

Airflow multiple runs of different task branches

Lets assume we have 2 tasks as airflow operators: task_1 and task_2. We want to skip task_1 on Mondays and run both tasks on the rest of the days. In addition we also want to re-run both tasks on monday at a later time.

Example of a monday run:

Run 1: task_2

Run 2: task_1 -> task_2

Example of rest of week runs:

Run 1: task_1 -> task_2

So on mondays first task gets skipped and in a later second run both tasks run, whereas on all other days both tasks run as usual.

We already looked at the BranchDayOfWeekOperator() (https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/weekday.html) but this does not solve the second re-run issue on mondays.

Can you help us with the re-run issue ? Thanks in advance.

Upvotes: 0

Views: 958

Answers (1)

Hussein Awala
Hussein Awala

Reputation: 5110

If you want to use BranchDayOfWeekOperator you can skip task_1 on Monday, and execute it on other days, and set the branch and the task_1 as upstream for task_2 and set its trigger rule to ONE_SUCCESS in order to execute it for the two branchs:

from airflow import DAG
from datetime import datetime
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.empty import EmptyOperator
from airflow.operators.weekday import BranchDayOfWeekOperator, WeekDay


with DAG(
    'dag',
    schedule_interval="@daily",
    start_date=datetime(2022, 12, 1),
) as dag:
    branch = BranchDayOfWeekOperator(
        task_id="skip_monday",
        follow_task_ids_if_true="task_2",
        follow_task_ids_if_false="task_1",
        week_day={WeekDay.MONDAY},
    )
    task_1 = EmptyOperator(task_id="task_1")
    task_2 = EmptyOperator(task_id="task_2", trigger_rule=TriggerRule.ONE_SUCCESS)
    branch >> [task_1, task_2]
    task_1 >> task_2

But you need to know that this operator use the start date by default, so if you want to skip Monday runs and not the runs executed on Monday (backfill or cleared runs), you need to add the argument use_task_logical_date=True.

In addition we also want to re-run both tasks on Monday at a later time.

if you want to run it later, you can run it manually using Airlfow UI, and activating Ignore All Deps enter image description here

Upvotes: 2

Related Questions