Reputation: 321
I want to run a python function in two situations
I know airflow provides the on_success_callback and on_failure_callback but these are when task completes or fails.
Upvotes: 1
Views: 3982
Reputation: 11
Add a PythonOperator as the initial task in your dag. This way, you can do your custom action at the very start to signal the start of the DAG. Of course, this doesn't give you a specific DAG level trigger, but it sort of emulates that.
begin_task = PythonOperator(
task_id="begin_task_demo",
python_callable=_begin_dag_callback_method)
For every task in your DAG, add the on_execute_callback. This works just like on_failure and on_success callbacks but it is executed when a task is picked up and begins execution. If you add this to the very first task in your DAG, you will know when the DAG is triggered.
begin_task = PythonOperator(
task_id="begin_task_demo",
python_callable=_begin_dag_callback_method,
on_execute_callback=_on_execute_call_back)
When every task starts Addressing this is easy with the on_execute_callback
begin_task = DummyOperator(
task_id="dummy_task_demo",
on_execute_callback=_on_execute_call_back)
Upvotes: 1
Reputation: 2591
Here are some of my thoughts:
When Dag runs means when someone triggers the dag: maybe this can be done by adding a task as upstream
When every task starts: try use the decorator
Let's say you have a PythonOperator for a task
t = PythonOperator( task_id="copy_staging_" + t, provide_context=True, python_callable=my_function, dag=dag)
You can use the decorator something like this and define that function call before the actual function starts in the decorator
@my_decorator
def my_function(**kwargs):
do_something()
def my_decorator(function):
def my_function():
print("I want to print extra stuff")
function()
return my_function
Upvotes: 0