Reputation: 51
I'm new with airflow and trying to figure out how to pass the DAG run date to each task, I have this in my DAG:
tzinfo=tz.gettz('America/Los_Angeles')
dag_run_date = datetime.now(_tzinfo)
dag = DAG(
'myDag',
default_args=default_args,
schedule_interval = None,
params = {
"runDateTimeTz" : dag_run_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")
}
)
Then I try to pass the runDateTimeTz parameter to each of my tasks, something like this..
task1 = GKEPodOperator(
image='gcr.io/myJar:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar.jar", {{params.runDateTimeTz}}"],
dag=dag)
task2 = GKEPodOperator(
image='gcr.io/myJar2:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar2.jar", {{params.runDateTimeTz}}"],
dag=dag)
My tasks are executed correctly but I was expecting all of them to receive the same run date in params.runDateTimeTz, but It didn't happen, for example task1 gets params.runDateTimeTz=2020-04-16T07:42:47.412716-07:00
and task2 gets params.runDateTimeTz= 2020-04-16T07:43:29.913289-07:00
I suppose this behavior is related to the way airflow fills the params
for the DAG, looks like params.runDateTimeTz is gotten only when the task starts to run, but I want to get it before and send it to each task as an argument expecting all the task to get the same value.
Can someone assist me on what I'm doing wrong?
Upvotes: 5
Views: 7845
Reputation: 18904
You can use the execution_date
or ds
from Airflow Macros:
Details: https://airflow.apache.org/docs/stable/macros-ref#default-variables
task1 = GKEPodOperator(
image='gcr.io/myJar:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar.jar", "{{ ds }}"],
dag=dag)
task2 = GKEPodOperator(
image='gcr.io/myJar2:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar2.jar", "{{ ds }}"],
dag=dag)
If you need a timestamp you can use {{ ts }}
Upvotes: 5