Sandesh
Sandesh

Reputation: 438

check if airflow task is running first time

I am writing a airflow dag with postgres operator.
I need to perform a one time operation in each task if the task is running first time.
How can we do this?

Upvotes: 0

Views: 4601

Answers (4)

Maciej Mochol
Maciej Mochol

Reputation: 1

If someone is looking for an answer in 2024 - prev_attempted_tries field of task instance can be used.

Example using Taskflow API:

@task
def test_task(ti=None):

    if ti.prev_attempted_tries < 2:
        print('this task is executed first time')
    else:
        print(f'this task was previously executed {ti.prev_attempted_tries-1} times already')

    raise AirflowException('test')

Upvotes: 0

Roshna Raj T M
Roshna Raj T M

Reputation: 11

I tried the other methods and did not work for me when using Airflow 2.0.2 with Bitnami Airflow helm chart. The below code block helped me identify first dag run. get_task_instances() is a DAG class function which you can find under https://github.com/apache/airflow/blob/main/airflow/models/dag.py. Note that the current function is taken from Airflow 2.0.2. Do the respective changes in the later versions if needed.

def run_this_func(ds, **context):
    prev_task_instances = context["dag"].get_task_instances(
        start_date = context["prev_execution_date"] - timedelta(minutes=60),
        end_date = context["prev_execution_date"]
    )
    if not prev_task_instances:
        print("This is the first DAG run")
    else:
        print(ds)

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    op_kwargs={
        "ds":"demo"
    },
    dag=dag,
)

The above code applies for schedule_interval other than "@once". For schedule_interval "@once" change the if condition like this :

if context["prev_execution_date"] is None:
     print("This is the first DAG run")
else:
     print(ds)

Upvotes: 0

Aramis NSR
Aramis NSR

Reputation: 1839

The optimum way to achieve this*(as im trying to achive this with one of my merge dags that depends on the previus run but it isnt there so should be ignored in first run)* is to make your dag use "pass parameters from the CLI" or "pass trigger configs in GUI"

follow these steps:

1-create an if-else condition in your python file to check if parameter is been passed choose different route. for example, you have key1 set to null, if its null it should go to route B, but in your way running it with trigger or CLI you pass a value, now it can go to route A.

2- use bellow code as an example to access the value passed to dag:

def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

3- for first run use pass parameter or trigger technique that ive mentioned

for example : airflow trigger_dag -c '{"key1":1, "key2":2}' dag_id

Upvotes: 0

Peter Carr
Peter Carr

Reputation: 11

If knowing if it's the first DAG run is good enough to infer it's the first task run as well then you could do the following:

Make a subclass for the PostgresOperator and do something like what's done here with provide_context in the PythonOperator class and its init code.

This will allow you to pass the execution macros to your task as a dictionary in kwargs (in an argument called context in the cited PythonOperator example above). You can then access 'prev_ds' or one of the other 'prev_' prefixed macros specified here.

If the value is None for any of these 'prev_' prefixed items, then you know it's the first DAG run (and very likely the first task run).

In your subclass you can override the execute function like this:

def execute(self):
  if self.context['prev_ds'] is None:
    # DO WHATEVER YOU WANT TO DO ON FIRST RUN HERE
  super().execute()

Upvotes: 1

Related Questions