Reputation: 438
I am writing a airflow dag with postgres operator.
I need to perform a one time operation in each task if the task is running first time.
How can we do this?
Upvotes: 0
Views: 4601
Reputation: 1
If someone is looking for an answer in 2024 - prev_attempted_tries field of task instance can be used.
Example using Taskflow API:
@task
def test_task(ti=None):
if ti.prev_attempted_tries < 2:
print('this task is executed first time')
else:
print(f'this task was previously executed {ti.prev_attempted_tries-1} times already')
raise AirflowException('test')
Upvotes: 0
Reputation: 11
I tried the other methods and did not work for me when using Airflow 2.0.2 with Bitnami Airflow helm chart. The below code block helped me identify first dag run. get_task_instances() is a DAG class function which you can find under https://github.com/apache/airflow/blob/main/airflow/models/dag.py. Note that the current function is taken from Airflow 2.0.2. Do the respective changes in the later versions if needed.
def run_this_func(ds, **context):
prev_task_instances = context["dag"].get_task_instances(
start_date = context["prev_execution_date"] - timedelta(minutes=60),
end_date = context["prev_execution_date"]
)
if not prev_task_instances:
print("This is the first DAG run")
else:
print(ds)
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
op_kwargs={
"ds":"demo"
},
dag=dag,
)
The above code applies for schedule_interval other than "@once". For schedule_interval "@once" change the if condition like this :
if context["prev_execution_date"] is None:
print("This is the first DAG run")
else:
print(ds)
Upvotes: 0
Reputation: 1839
The optimum way to achieve this*(as im trying to achive this with one of my merge dags that depends on the previus run but it isnt there so should be ignored in first run)* is to make your dag use "pass parameters from the CLI" or "pass trigger configs in GUI"
follow these steps:
1-create an if-else condition in your python file to check if parameter is been passed choose different route.
for example, you have key1
set to null
, if its null it should go to route B, but in your way running it with trigger or CLI you pass a value, now it can go to route A.
2- use bellow code as an example to access the value passed to dag:
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
3- for first run use pass parameter or trigger technique that ive mentioned
for example : airflow trigger_dag -c '{"key1":1, "key2":2}' dag_id
Upvotes: 0
Reputation: 11
If knowing if it's the first DAG run is good enough to infer it's the first task run as well then you could do the following:
Make a subclass for the PostgresOperator
and do something like what's done here with provide_context
in the PythonOperator
class and its init code.
This will allow you to pass the execution macros to your task as a dictionary in kwargs (in an argument called context in the cited PythonOperator example above). You can then access 'prev_ds' or one of the other 'prev_' prefixed macros specified here.
If the value is None for any of these 'prev_' prefixed items, then you know it's the first DAG run (and very likely the first task run).
In your subclass you can override the execute
function like this:
def execute(self):
if self.context['prev_ds'] is None:
# DO WHATEVER YOU WANT TO DO ON FIRST RUN HERE
super().execute()
Upvotes: 1