alltej
alltej

Reputation: 7285

Airflow set task instance status as skipped programmatically

I have list that I loop to create the tasks. The list are static as far as size.

        for counter, account_id in enumerate(ACCOUNT_LIST):
            task_id = f"bash_task_{counter}"
            if account_id:
                trigger_task = BashOperator(
                    task_id=task_id,
                    bash_command="echo hello there",
                    dag=dag)
            else:
                trigger_task = BashOperator(
                    task_id=task_id,
                    bash_command="echo hello there",
                    dag=dag)
                trigger_task.status = SKIPPED # is there way to somehow set status of this to skipped instead of having a branch operator?
            trigger_task

I tried this manually but cannot make the task skipped:

        start = DummyOperator(task_id='start')
        task1 = DummyOperator(task_id='task_1')
        task2 = DummyOperator(task_id='task_2')
        task3 = DummyOperator(task_id='task_3')
        task4 = DummyOperator(task_id='task_4')

        start >> task1
        start >> task2

        try:
            start >> task3
            raise AirflowSkipException
        except AirflowSkipException as ase:
            log.error('Task Skipped for task3')
            
        try:
            start >> task4
            raise AirflowSkipException
        except AirflowSkipException as ase:
            log.error('Task Skipped for task4')

Upvotes: 1

Views: 13019

Answers (2)

alltej
alltej

Reputation: 7285

Have a fixed number of tasks to execute per DAG. This is really fine and this is also planning how much max parallel task your system should handle without degrading downstream systems. Also, having fixed number of tasks makes it visible in the web UI and give you indication whether they are executed or skipped.

In the code below, I initialized the list with None items and then update the list with values based on returned data from the DB. In the python_callable function, check if the account_id is None then raise an AirflowSkipException, otherwise execute the function. In the UI, the tasks are visible and indicates whether executed or skipped(meaning there is no account_id)

    def execute(account_id):
        if account_id:
            print(f'************Executing task for account_id:{account_id}')
        else:
            raise AirflowSkipException

    def create_task(task_id, account_id):
        return PythonOperator(task_id=task_id,
                              python_callable=execute,
                              op_args=[account_id])


    list_from_dbhook = [1, 2, 3] # dummy list. Get records using DB Hook

    # Need to have some fix size. Need to allocate fix resources or # of tasks.
    # Having this fixed number of tasks will make this tasks to be visible in UI instead of being purely dynamic
    record_size_limit = 5 
    
    ACCOUNT_LIST = [None] * record_size_limit
    for index, account_id_val in enumerate(list_from_dbhook):
        ACCOUNT_LIST[index] = account_id_val

    for idx, acct_id in enumerate(ACCOUNT_LIST):
        task = create_task(f"task_{idx}", acct_id)
        task

enter image description here

Upvotes: 0

Mike Taylor
Mike Taylor

Reputation: 699

yes there you need to raise AirflowSkipException

from airflow.exceptions import AirflowSkipException

raise AirflowSkipException

For more information see the source code

Upvotes: 10

Related Questions