user3531900
user3531900

Reputation: 321

Airflow callbacks for tasks

I want to run a python function in two situations

  1. When Dag runs means when someone triggers the dag
  2. When every task starts

I know airflow provides the on_success_callback and on_failure_callback but these are when task completes or fails.

Upvotes: 1

Views: 3982

Answers (2)

Saami Siddiqui
Saami Siddiqui

Reputation: 11

  1. When Dag runs means when someone triggers the dag. To address this, I would suggest two options.
  • Add a PythonOperator as the initial task in your dag. This way, you can do your custom action at the very start to signal the start of the DAG. Of course, this doesn't give you a specific DAG level trigger, but it sort of emulates that.

         begin_task = PythonOperator(
            task_id="begin_task_demo",
            python_callable=_begin_dag_callback_method)
    
  • For every task in your DAG, add the on_execute_callback. This works just like on_failure and on_success callbacks but it is executed when a task is picked up and begins execution. If you add this to the very first task in your DAG, you will know when the DAG is triggered.

        begin_task = PythonOperator(
            task_id="begin_task_demo",
            python_callable=_begin_dag_callback_method,
            on_execute_callback=_on_execute_call_back)
    
  1. When every task starts Addressing this is easy with the on_execute_callback

    begin_task = DummyOperator(
    task_id="dummy_task_demo",
    on_execute_callback=_on_execute_call_back)
    
    

Upvotes: 1

Chengzhi
Chengzhi

Reputation: 2591

Here are some of my thoughts:

  • When Dag runs means when someone triggers the dag: maybe this can be done by adding a task as upstream

  • When every task starts: try use the decorator
    Let's say you have a PythonOperator for a task

    t = PythonOperator(
         task_id="copy_staging_" + t,
         provide_context=True,
         python_callable=my_function,
         dag=dag)
    

You can use the decorator something like this and define that function call before the actual function starts in the decorator

@my_decorator 
def my_function(**kwargs): 
    do_something()


def my_decorator(function):
    def my_function():
        print("I want to print extra stuff")
        function()
    return my_function

Upvotes: 0

Related Questions