Reputation: 627
I'm using a parameter that is the timestamp in a set of tasks:
default_dag_args = {'arg1': 'arg1-value',
'arg2': 'arg2-value',
'now': datetime.now()}
I would like that the now parameter would have the same value for all the tasks. But what happens is that it's re-executed for each function
Is there a way of doing it (executing once and using the same value through the dag)? I'm using the TaskFlow API for Airflow 2.0:
@task
def python_task()
context = get_current_context()
context_dag = context['dag']
now = context_dag.default_args['now']
print now
Upvotes: 0
Views: 388
Reputation: 21
When a worker gets a task instance to run, it rebuilds the whole DagBag from the Python files to get the DAG and task definition. So every time a task instance is ran, your DAG file is sourced, rerunning your DAG definition code. And that resulting DAG object is the one that the particular task instance will be defined by.
It's critical to understand that the DAG definition is not simply built once for every execution date and then persisted/reused for all TIs within that DagRun. The DAG definition is constantly being recomputed from your Python code, each TI is ran in a separate process independently and without state from other tasks. Thus, if your DAG definition includes non-deterministic results at DagBag build time - such as datetime.now()
- every instantiation of your DAG even for the same execution date will have different values. You need to build your DAGs in a deterministic and idempotent manner.
The only way to share non-deterministic results is to store them in the DB and have your tasks fetch them, as @sezai-burak-kantarcı has pointed out. Best practice is to use task context-specific variables, like {{ ds }}
, {{ execution_date }}
, {{ data_interval_start }}
, etc. These are the same for all tasks within a DAG run. You can see the template variables available here: Airflow emplates reference
Upvotes: 0
Reputation: 225
I tried to set the time constant, at the start of the dag file, like:
TIME = datetime.now()
and got the context inside of the tasks with get_current_context()
just like you did.
Sadly, I think because of running the DAG file from start, every time a task got defined in the script, time was recalculated.
One idea I have is to use XCOM's in order to save the datetime to a variable and pull it to other tasks:
My sample code is below, I think you'll get the idea.
from airflow.decorators import task, dag
from datetime import datetime
import time
default_arguments = {
'owner': 'admin',
# This is the beginning, for more see: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime(2022, 5, 2)
}
@dag(
schedule_interval=None,
dag_id = "Time_Example_Dag",
default_args = default_arguments,
catchup=False,
)
def the_global_time_checker_dag():
@task
def time_set():
# To use XCOM to pass the value between tasks,
# we have to parse the datetime to a string.
now = str(datetime.now())
return now
@task
def starting_task(datetime_string):
important_number = 23
# We can use this datetime object in whatever way we like.
date_time_obj = datetime.strptime(datetime_string, '%Y-%m-%d %H:%M:%S.%f')
print(date_time_obj)
return important_number
@task
def important_task(datetime_string, number):
# Passing some time
time.sleep(10)
# Again, we are free to do whatever we want with this object.
date_time_obj = datetime.strptime(datetime_string, '%Y-%m-%d %H:%M:%S.%f')
print(date_time_obj)
print("The important number is: {}".format(number))
time_right_now = time_set()
start = starting_task(datetime_string = time_right_now)
important = important_task(datetime_string = time_right_now, number = start)
time_checker = the_global_time_checker_dag()
Through the logs, you can see all the datetime values are the same.
For more information about XCOM in Taskflow API, you can check here.
Upvotes: 1