Gaëtan
Gaëtan

Reputation: 859

Why does BranchPythonOperator make my DAG fail?

I am new to Airflow and I just have a stupid DAG that I am using to experiment the functionalities. A while back, I tested the BranchPythonOperator, and it was working fine. But today it makes my DAG fail. The weird part is that it is not the branching task itself that fails, but the first task of the DAG. Even weirder (and annoying), there are no logs, so I don't know what causes the first task to fail. Here is my dag without the branching:

Successful Graph

And here is with the branching:

Failing Graph

And here is the code:

from datetime import timedelta, datetime
from textwrap import dedent

# Airflow imports
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
from time import sleep

# Dictionary containing inputs that tasks have in common
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Create the DAG object
with DAG(
    'test_dag',
    default_args=default_args,
    description='A test DAG to mess around with Airflow',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['example'],
) as dag:

    # First task
    def start(**kwargs):
        return "Start"
    
    start = PythonOperator(
        task_id='start',
        python_callable=start,
        provide_context=True
    )

    # Third task, calls a Python function to branch
    def is_afternoon(**kwargs):
        date = str(datetime.now())
        hour = int (date.split()[1].split(':')[0])
        if hour<12:
            return "morning"
        else:
            return "afternoon"
    
    fork = BranchPythonOperator(
        task_id='fork',
        python_callable=is_afternoon,
        provide_context=True
    )

    # Fourth task is different depending on the time of the day
    def print_morning(**kwargs):
        print ('morning')
        return 'morning'
    
    def print_afternoon(**kwargs):
        print ('afternoon')
        return 'afternoon'

    morning = PythonOperator(
        task_id='morning',
        python_callable=print_morning
    )

    afternoon = PythonOperator(
        task_id='afternoon',
        python_callable=print_afternoon
    )

    # Join task after the condition split
    def joined(**kwargs):
        return "Joined"

    join = PythonOperator(
        task_id='join',
        python_callable=joined,
        provide_context=True,
        trigger_rule=TriggerRule.ONE_SUCCESS
    )

    start >> fork >> [morning, afternoon] >> join

I even checked out an older commit where I am sure that this code was working, but it still fails.

I am running Airflow in docker, but with fixed images of Anaconda 3 and Postgres 11 (I am not just pulling the latest version from Docker Hub, I give a precise version when creating the containers), so I don't see what could have broken everything overnight.

Upvotes: 0

Views: 445

Answers (1)

bruno-uy
bruno-uy

Reputation: 1855

If Airflow is working as expected (not a general problem apart from this DAG), the only problem I see is that you didn't connect start with fork and fork with morning and afternoon.

# DAG dependencies definition
start >> fork >> [morning, afternoon] >> join

I also deleted from textwrap import dedent because you're not using it and works for me locally:

enter image description here

Upvotes: 2

Related Questions