Matheus Oliveira
Matheus Oliveira

Reputation: 627

How to avoid dynamic execution of expression in dag parameter at Airflow?

I'm using a parameter that is the timestamp in a set of tasks:

default_dag_args = {'arg1': 'arg1-value',
                    'arg2': 'arg2-value',
                    'now': datetime.now()}

I would like that the now parameter would have the same value for all the tasks. But what happens is that it's re-executed for each function

Is there a way of doing it (executing once and using the same value through the dag)? I'm using the TaskFlow API for Airflow 2.0:

    @task
    def python_task()
        context = get_current_context()
        context_dag = context['dag']
        now = context_dag.default_args['now']

        print now

Upvotes: 0

Views: 388

Answers (2)

kyle andelin
kyle andelin

Reputation: 21

When a worker gets a task instance to run, it rebuilds the whole DagBag from the Python files to get the DAG and task definition. So every time a task instance is ran, your DAG file is sourced, rerunning your DAG definition code. And that resulting DAG object is the one that the particular task instance will be defined by.

It's critical to understand that the DAG definition is not simply built once for every execution date and then persisted/reused for all TIs within that DagRun. The DAG definition is constantly being recomputed from your Python code, each TI is ran in a separate process independently and without state from other tasks. Thus, if your DAG definition includes non-deterministic results at DagBag build time - such as datetime.now() - every instantiation of your DAG even for the same execution date will have different values. You need to build your DAGs in a deterministic and idempotent manner.

The only way to share non-deterministic results is to store them in the DB and have your tasks fetch them, as @sezai-burak-kantarcı has pointed out. Best practice is to use task context-specific variables, like {{ ds }}, {{ execution_date }}, {{ data_interval_start }}, etc. These are the same for all tasks within a DAG run. You can see the template variables available here: Airflow emplates reference

Upvotes: 0

Sezai Burak Kantarcı
Sezai Burak Kantarcı

Reputation: 225

I tried to set the time constant, at the start of the dag file, like:

TIME = datetime.now()

and got the context inside of the tasks with get_current_context() just like you did.

Sadly, I think because of running the DAG file from start, every time a task got defined in the script, time was recalculated.

One idea I have is to use XCOM's in order to save the datetime to a variable and pull it to other tasks:

My sample code is below, I think you'll get the idea.

from airflow.decorators import task, dag
from datetime import datetime
import time

default_arguments = {
    'owner': 'admin',
    # This is the beginning, for more see: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime(2022, 5, 2)
}

@dag(
    schedule_interval=None,
    dag_id = "Time_Example_Dag",
    default_args = default_arguments,
    catchup=False,
)
def the_global_time_checker_dag():

    @task
    def time_set():
        # To use XCOM to pass the value between tasks, 
        # we have to parse the datetime to a string.
        now = str(datetime.now())
        return now

    @task
    def starting_task(datetime_string):
        important_number = 23
        # We can use this datetime object in whatever way we like.
        date_time_obj = datetime.strptime(datetime_string, '%Y-%m-%d %H:%M:%S.%f')
        print(date_time_obj)
        return important_number

    @task
    def important_task(datetime_string, number):
        # Passing some time
        time.sleep(10)
        # Again, we are free to do whatever we want with this object.
        date_time_obj = datetime.strptime(datetime_string, '%Y-%m-%d %H:%M:%S.%f')
        print(date_time_obj)
        print("The important number is: {}".format(number))

    time_right_now = time_set()        
    start = starting_task(datetime_string = time_right_now)
    important = important_task(datetime_string = time_right_now, number = start)

time_checker = the_global_time_checker_dag()

Through the logs, you can see all the datetime values are the same.

For more information about XCOM in Taskflow API, you can check here.

Upvotes: 1

Related Questions