George Mansoor
George Mansoor

Reputation: 141

Pass other arguments to on_failure_callback

I'd like to pass other arguments to my on_failure_callback function but it only seems to want "context". How do I pass other arguments to that function...especially since I'd like to define that function in a separate module so it can be used in all my DAGS.

My current default_args looks like this:

default_args = {
  'owner': 'Me',
  'depends_on_past': True,
  'start_date': datetime(2016,01,01),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=1),
  'on_failure_callback': notify_failure,
  'max_active_runs': 1
}

If I try something like this airflow complains:

default_args = {
  'owner': 'Me',
  'depends_on_past': True,
  'start_date': datetime(2016,01,01),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=1),
  'on_failure_callback': notify_failure(context,arg1,arg2),
  'max_active_runs': 1
}

so not sure how to pass arg1 and arg2 to my notify_failure fuction that I would like to define in a separate module that I can simply import into my DAG

Upvotes: 13

Views: 14130

Answers (3)

CallumO
CallumO

Reputation: 171

As a heads up my work around was to use a lambda function to get the context parameter, then just pass that into the function you want on the other side of the lambda:-

on_failure_callback = lambda context: my_function(context, arg2)

Full example:-

def my_function(context, agr2):
  # function code here

default_args = {
  'owner': 'myself',
   ...
   ...
  'on_failure_callback = lambda context: my_function(context, arg2),
}

Upvotes: 1

Vincent Claes
Vincent Claes

Reputation: 4768

you can use a nested function for this

def generic_failure(arg1, arg2):
    def failure(context):
        message = 'we have a function that failed witg args : {ARG1}, {ARG2}'.format(ARG1=arg1,ARG2=arg2)
        print(message)
        return message
    return failure

arg1 = 'arg1'
arg2 = 'arg2'

default_args = {
  'owner': 'Me',
  'on_failure_callback': generic_failure(arg1, arg2),
}

Upvotes: 5

cwurtz
cwurtz

Reputation: 3257

Assuming the args are something you can define at the DAG level, then you can use the partials package. ie:

from functools import partial

def generic_failure(arg1, arg2, context):
  # do whatever

default_args = {
  'owner': 'Me',
  'depends_on_past': True,
  'start_date': datetime(2016,01,01),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=1),
  'on_failure_callback': partial(generic_failure, arg1, arg2),
  'max_active_runs': 1
}

Calling partial(generic_failure, arg1, arg2) will return a function expecting however many arguments are remaining in generic_failure, which in the above example is just the single param context

Upvotes: 34

Related Questions