Oscar Foley
Oscar Foley

Reputation: 7025

In Airflow how can I pass parameters using context to on_success_callback function handler?

In Airflow how can I pass parameters using context to on_success_callback function handler?

This is my test code:

import airflow
from airflow import DAG
from airflow.operators import MSTeamsWebhookOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from transaction_analytics import helpers
from airflow.utils.helpers import chain

# Parameters & variables
schedule_interval        = "0 20 * * *"

def _task_success_callback(context):
    dagid         = context["task_instance"].dag_id
    duration      = context["task_instance"].duration
    executiondate = context["execution_date"]
    logurl        = context["task_instance"].log_url.replace("localhost", "agbqhsbldd017v.agb.rbxd.ds")# workaround until we config airflow
    pp1           = context["params"].param1 
    #pp1          = "{{ params.param1 }}"
    ms_teams_op = MSTeamsWebhookOperator(
        task_id="success_notification",
        http_conn_id="msteams_airflow",
        message="DAG {ppram1} `{dag}`  finished successfully!".format(dag=context["task_instance"].dag_id, ppram1=pp1),
        subtitle="Execution Date = {p1}, Duration = {p2}".format(p1=executiondate,p2=duration),
        button_text = "View log",
        button_url = "{log}".format(log=logurl),
        theme_color="00FF00"#,
        #proxy= "http://10.72.128.202:3128"
    )
    ms_teams_op.execute(context)

    main_dag = DAG('test_foley',
            schedule_interval=schedule_interval,
            description='Test foley',
            start_date=datetime(2020, 4, 19),
            default_args=None,
            max_active_runs=2,
            default_view='graph',     # Default view graph
            #orientation='TB', # Top-Bottom graph
            on_success_callback=_task_success_callback,
            #on_failure_callback=outer_task_failure_callback,
            catchup=False, # Do not catchup, run only latest
            params={
              "param1": "value1",
              "param2": "value2"
            }
           )

################################### START ######################################
dag_chain = []

start = DummyOperator(task_id='start', retries = 3, dag=main_dag)
dag_chain.append(start)

step1 = BashOperator(
    task_id='step1',
    bash_command='pwd',
    dag=main_dag,
)
dag_chain.append(step1)

step2 = BashOperator(
    task_id='step2',
    bash_command='exit 0',
    dag=main_dag,
)
dag_chain.append(step2)


end = DummyOperator(task_id='end', dag=main_dag)
dag_chain.append(end)

chain(*dag_chain)

I have an event handler function _task_success_callback that handles success. In DAG I have on_success_callback=_task_success_callback that captures that event.

And it works... but now I need to pass some parameters into _task_success_callback. What is the best method?

As that function receives context I tried to create parameters in DAG as you can see:

        params={
          "param1": "value1",
          "param2": "value2"
        }

But seems I cannot access them?

My questions are:

  1. What am I doing wrong to access params?
  2. Is there a better way to pass parameters?

NOTE: I saw this similar question How to pass parameters to Airflow on_success_callback and on_failure_callback with one answer... and works. But what I am looking is to use context to pass parameters....

Upvotes: 2

Views: 23950

Answers (4)

Shubhank Gupta
Shubhank Gupta

Reputation: 825

You can directly pass user-defined parameters in the definition of DAG in case of either on_success_callback or on_failure_callback like this params={'custom_param': 'custom_param_value'} along with on_success_callback or on_success_callback function call. It adds values in form of key-value pair as a dictionary inside context['params']

You can access the value of your customized parameter anywhere inside a function call like this context['params'].get('custom_param')

def dag_failure_notification_alert(context):
    print(context['params'].get('custom_param'))

default_args = {
    "owner": "abcd",
    "start_date": datetime(2021, 12, 12),
    'retries': 0,
    'retry_delay': timedelta(),
    "schedule_interval": "@daily"
}

dag = DAG('alert_notification_dummy_dag',
          default_args=default_args,
          catchup=False,
          on_failure_callback=dag_failure_notification_alert,
          params={
              'custom_param': 'custom_param_value'
          }
          )

Here you can pass any number of parameters like

params={"x": Param(5, type="integer", minimum=3)},

or

params={'x': [10,20,30]}

for reference, kindly look at documentation

Upvotes: 1

felicityiz
felicityiz

Reputation: 9

I used partials to call a fallback function with different connections:

from functools import *

# pass extra params here
def my_conn_callback(context, slack_conn_id, slack_channel):
  print(context)
  print(slack_conn_id)
  print(slack_channel)

# this is for the on_failure_callback
task_fail_slack_alert_callback_my_conn = partial(my_conn_callback, 
slack_conn_id = "slack-conn", slack_channel = "#slack_channel")

# this is how airflow calls it internally:
print(task_fail_slack_alert_callback_my_conn('context'))

Upvotes: 1

kemazariegos
kemazariegos

Reputation: 41

You can create a task that its only purpose is to push configuration setting through xcoms. You can pull the configuration via context as the task_instance object is included in context.

def push_configuration(ti, params):
    ti.xcom_push(key='conn_id', value=params)

def _task_success_callback(context):
    ti = context.get('ti') 
    params = ti.xcom_pull(key='params', task_ids='Settings')
    ...


step0 = PythonOperator(
        task_id='Settings',
        python_callable=push_configuration,
        op_kwargs={'params': params})

step1 = BashOperator(
        task_id='step1',
        bash_command='pwd',
        on_success_callback=_task_success_callback)

Upvotes: 1

joebeeson
joebeeson

Reputation: 4366

Recall that Airflow process files are simply Python, and provided you don't introduce too much overhead during their parsing (since Airflow parses the files frequently, and that overhead can add up), you can use everything Python can do. In particular for your case I recommend returning a nested function (closure) for your callback:

Put this in a file adjacent your Airflow processes, let's say on_callbacks.py

def success_ms_teams(param_1, param_2):

    def callback_func(context):
        print(f"param_1: {param_1}")
        print(f"param_2: {param_2}")
        # ... trimmed for brevity ...#
        ms_teams_op.execute(context)

    return callback_func

Then in your processes you can do this:

from airflow import models

from on_callbacks import success_ms_teams

with models.DAG(
    ...
    on_success_callback=success_ms_teams(
        "value1", # These values become the
        "value2", # `param_1` and `param_2` 
    )
) as dag:
    ...

Upvotes: 5

Related Questions