Reputation: 7025
In Airflow how can I pass parameters using context to on_success_callback function handler?
This is my test code:
import airflow
from airflow import DAG
from airflow.operators import MSTeamsWebhookOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from transaction_analytics import helpers
from airflow.utils.helpers import chain
# Parameters & variables
schedule_interval = "0 20 * * *"
def _task_success_callback(context):
dagid = context["task_instance"].dag_id
duration = context["task_instance"].duration
executiondate = context["execution_date"]
logurl = context["task_instance"].log_url.replace("localhost", "agbqhsbldd017v.agb.rbxd.ds")# workaround until we config airflow
pp1 = context["params"].param1
#pp1 = "{{ params.param1 }}"
ms_teams_op = MSTeamsWebhookOperator(
task_id="success_notification",
http_conn_id="msteams_airflow",
message="DAG {ppram1} `{dag}` finished successfully!".format(dag=context["task_instance"].dag_id, ppram1=pp1),
subtitle="Execution Date = {p1}, Duration = {p2}".format(p1=executiondate,p2=duration),
button_text = "View log",
button_url = "{log}".format(log=logurl),
theme_color="00FF00"#,
#proxy= "http://10.72.128.202:3128"
)
ms_teams_op.execute(context)
main_dag = DAG('test_foley',
schedule_interval=schedule_interval,
description='Test foley',
start_date=datetime(2020, 4, 19),
default_args=None,
max_active_runs=2,
default_view='graph', # Default view graph
#orientation='TB', # Top-Bottom graph
on_success_callback=_task_success_callback,
#on_failure_callback=outer_task_failure_callback,
catchup=False, # Do not catchup, run only latest
params={
"param1": "value1",
"param2": "value2"
}
)
################################### START ######################################
dag_chain = []
start = DummyOperator(task_id='start', retries = 3, dag=main_dag)
dag_chain.append(start)
step1 = BashOperator(
task_id='step1',
bash_command='pwd',
dag=main_dag,
)
dag_chain.append(step1)
step2 = BashOperator(
task_id='step2',
bash_command='exit 0',
dag=main_dag,
)
dag_chain.append(step2)
end = DummyOperator(task_id='end', dag=main_dag)
dag_chain.append(end)
chain(*dag_chain)
I have an event handler function _task_success_callback that handles success. In DAG I have on_success_callback=_task_success_callback that captures that event.
And it works... but now I need to pass some parameters into _task_success_callback. What is the best method?
As that function receives context I tried to create parameters in DAG as you can see:
params={
"param1": "value1",
"param2": "value2"
}
But seems I cannot access them?
My questions are:
NOTE: I saw this similar question How to pass parameters to Airflow on_success_callback and on_failure_callback with one answer... and works. But what I am looking is to use context to pass parameters....
Upvotes: 2
Views: 23950
Reputation: 825
You can directly pass user-defined parameters in the definition of DAG in case of either on_success_callback or on_failure_callback like this
params={'custom_param': 'custom_param_value'}
along with on_success_callback or on_success_callback function call. It adds values in form of key-value pair as a dictionary inside context['params']
You can access the value of your customized parameter anywhere inside a function call like this
context['params'].get('custom_param')
def dag_failure_notification_alert(context):
print(context['params'].get('custom_param'))
default_args = {
"owner": "abcd",
"start_date": datetime(2021, 12, 12),
'retries': 0,
'retry_delay': timedelta(),
"schedule_interval": "@daily"
}
dag = DAG('alert_notification_dummy_dag',
default_args=default_args,
catchup=False,
on_failure_callback=dag_failure_notification_alert,
params={
'custom_param': 'custom_param_value'
}
)
Here you can pass any number of parameters like
params={"x": Param(5, type="integer", minimum=3)},
or
params={'x': [10,20,30]}
for reference, kindly look at documentation
Upvotes: 1
Reputation: 9
I used partials to call a fallback function with different connections:
from functools import *
# pass extra params here
def my_conn_callback(context, slack_conn_id, slack_channel):
print(context)
print(slack_conn_id)
print(slack_channel)
# this is for the on_failure_callback
task_fail_slack_alert_callback_my_conn = partial(my_conn_callback,
slack_conn_id = "slack-conn", slack_channel = "#slack_channel")
# this is how airflow calls it internally:
print(task_fail_slack_alert_callback_my_conn('context'))
Upvotes: 1
Reputation: 41
You can create a task that its only purpose is to push configuration setting through xcoms
. You can pull the configuration via context
as the task_instance
object is included in context
.
def push_configuration(ti, params):
ti.xcom_push(key='conn_id', value=params)
def _task_success_callback(context):
ti = context.get('ti')
params = ti.xcom_pull(key='params', task_ids='Settings')
...
step0 = PythonOperator(
task_id='Settings',
python_callable=push_configuration,
op_kwargs={'params': params})
step1 = BashOperator(
task_id='step1',
bash_command='pwd',
on_success_callback=_task_success_callback)
Upvotes: 1
Reputation: 4366
Recall that Airflow process files are simply Python, and provided you don't introduce too much overhead during their parsing (since Airflow parses the files frequently, and that overhead can add up), you can use everything Python can do. In particular for your case I recommend returning a nested function (closure) for your callback:
Put this in a file adjacent your Airflow processes, let's say on_callbacks.py
def success_ms_teams(param_1, param_2):
def callback_func(context):
print(f"param_1: {param_1}")
print(f"param_2: {param_2}")
# ... trimmed for brevity ...#
ms_teams_op.execute(context)
return callback_func
Then in your processes you can do this:
from airflow import models
from on_callbacks import success_ms_teams
with models.DAG(
...
on_success_callback=success_ms_teams(
"value1", # These values become the
"value2", # `param_1` and `param_2`
)
) as dag:
...
Upvotes: 5