Reputation: 21
I am trying to create a dynamic tasklist to check the previous batch runs for the day got completed or not. In order to achieve that, I have the Timings(HHMM) stored in the Airflow variable and I have used the datetime.now() variable to get the current HHMM and creates a list of previous runs. But as the Airflow dag gets validated everytime, it picks up the latest date and time and it generates new previous task list based on that.
I was trying to see instead of comparing the datetime.now(), using the {{ ds }} and {{ ts }} default airflow variables to avoid the above issue. But it treats these variables as String or not able recognize those as variables and throwing ts/ds variable not defined.
is there way/workaround to access these variables outside of the operators as the above logic is to create a list of dynamic tasks to be run based on to check the previous batch run completion.
Thanks in Advance.
from datetime import datetime,timedelta,date
from pytz import timezone, utc
import pendulum
## Below would come from Airflow variable.
dag_times = ["0700", "0715", "0730" ,"0730", "0930","1130","1330","1630","2000"]
## This is the code to get the current time.. this is keep changing as the airflow validates the DAG.
current_dag_time = datetime.now().astimezone(timezone('US/Pacific')).strftime('%H%M')
schedule_run_time = min(dag_times, key=lambda x:abs(int(x)-int(current_dag_time)))
current_run = dag_times.index(schedule_run_time)
print("current_run",current_run)
intra_day_time = dag_times[dag_times.index(schedule_run_time)-1] if current_run > 0 else schedule_run_time
previous_runs = []
if current_run > 0:
# print(dag_times.index(schedule_run_time))
previous_runs = dag_times[0:dag_times.index(schedule_run_time)]
else:
previous_runs.append(dag_times[-1])
previous_run_tasks=[]
for dag_name in previous_runs:
item = {}
if int(dag_name) == 0:
if date.today().weekday() == 0 :
start_time =-52
end_time = 4
else:
start_time =-24
end_time = 24
# poke_task_name = "SAMPLE_BOX_%s" % dag_name
item = {"poke_task_name": "SAMPLE_BOX_%s" % dag_name, "start_time":start_time, "end_time":end_time}
elif int(dag_name) > 0 :
start_time =0
end_time = 24
poke_task_name = "SAMPLE_BOX_%s" % dag_name
item = {"poke_task_name": "SAMPLE_BOX_%s" % dag_name, "start_time":start_time, "end_time":end_time}
else:
print("error")
previous_run_tasks.append(item)
print(previous_run_tasks)
if int(schedule_run_time) == 0:
if date.today().weekday() == 0 :
start_time =-52
end_time = 4
else:
start_time =-24
end_time = 24
poke_task_name = "SAMPLE_BOX_%s" % dag_times[-1]
generate_task_name = "SAMPLE_BOX_%s" % schedule_run_time
elif int(schedule_run_time) > 0 :
start_time =0
end_time = 24
poke_task_name = "SAMPLE_BOX_%s" % intra_day_time
generate_task_name = "SAMPLE_BOX_%s" % schedule_run_time
else:
print("error")
print("start_time::::",start_time)
print("end_time::::",end_time)
print("generate_task_name::::",generate_task_name)
print("poke_task_name::::",poke_task_name)
Upvotes: 2
Views: 6705
Reputation: 3903
These Airflow default variables are only instantiated in the context of a task instance for a given DAG run, and thus they are only available in the templated fields of each operator. Trying to use them outside of this context will not work.
I have prepared a simple DAG with task that displays execution date (ds
) as a parameter:
from airflow import macros
from airflow import models
from airflow.operators import bash_operator
import datetime
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_args = {
"start_date": yesterday,
"retries": 1,
"email_on_failure": False,
"email_on_retry": False,
"email": "[email protected]"
}
with models.DAG(
'printing_the_execution_date_ts',
schedule_interval=datetime.timedelta(days=1),
default_args=default_args) as dag:
printing_the_execution_date = bash_operator.BashOperator(
task_id="display",
bash_command="echo {{ ds }}"
)
printing_the_execution_date
The {{ }}
brackets tell Airflow that this is a Jinja template.
You may also use ts
variable which is the execution date in ISO 8601 format. Thus, in the dag run stamped with 2020-05-10
, this would render to:
'echo {{ ds }}'
echo 2020-05-10
'echo {{ ts }}'
echo 2020-05-10T00:00:00+00:00
I recommend you to take a look for this Stackoverflow thread, where you can find example with using PythonOperator.
Upvotes: 2