Kam
Kam

Reputation: 11

Can't access Airflow default_args

I'm new to airflow, can someone please help me with this as I'm unable to access 'db_conn' inside my custom operator, this argument defined in default_args.

**Dag details:**

default_args = {
    'owner': 'airflow',
    'email': ['[email protected]'],
    'db_conn': 'database_connection'
}

dag = DAG(dag_id='my_custom_operator_dag', description='Another tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2020, 8, 6), 
          catchup=False,
          default_args=default_args)

operator_task_start = MyOperator(                                
                                task_id='my_custom_operator', dag=dag
                                )

**Operator details:**
class MyOperator(BaseOperator):

    #@apply_defaults
    def __init__(self,
                 *args, 
                 **kwargs):          
        super(MyOperator, self).__init__(*args,**kwargs)
    def execute(self, context):        
        log.info('owner: %s', self.owner)
        log.info('email: %s', self.email)
        log.info('db_conn: %s', self.db_conn) 
        # Error here, AttributeError: 'MyOperator' object has no attribute 'db_conn

Upvotes: 1

Views: 1625

Answers (1)

y2k-shubham
y2k-shubham

Reputation: 11597

You seem to have misunderstood default_args. default_args is just a shorthand (code-cleanup / refactoring / brevity) to pass common (which have same value for all operators of DAG, like owner) args to all your operators, by setting them up as defaults and passing to the DAG itself. Quoting the docstring comment from DAG params

:param default_args: A dictionary of default parameters to be used
        as constructor keyword parameters when initialising operators.
        Note that operators have the same hook, and precede those defined
        here, meaning that if your dict contains `'depends_on_past': True`
        here and `'depends_on_past': False` in the operator's call
        `default_args`, the actual value will be `False`.
:type default_args: dict

So clearly for default_args to work, any keys that you are passing there should be an argument of your Operator classes.


Not just that, do note that passing invalid (non-existent) arguments to Operator constructor(s) will be penalized in Airflow 2.0 (so better not pass any)

'Invalid arguments were passed to {c} (task_id: {t}). '
'Support for passing such arguments will be dropped in '
'future. ..

Hopefully, by now it must be clear that to make this work, you must add a param db_conn in constructor of your MyOperator class

**Operator details:**
class MyOperator(BaseOperator):

    #@apply_defaults
    def __init__(self,
                 db_conn: str,
                 *args, 
                 **kwargs):          
        super(MyOperator, self).__init__(*args,**kwargs)
        self.db_conn: str = db_conn

And while we are at it, may I offer you a suggestion: for something like a connection, preferably use the Connection feature offered by Airflow which eases your interaction with external services

  • makes them manageable (view / edit via UI)
  • secure (they are stored encrypted in db)
  • support for load balancing (define multiple connections with same conn_id, Airflow will randomly distribute calls to one of those)

When there is more than one connection with the same conn_id, the get_connection() method on BaseHook will choose one connection randomly. This can be be used to provide basic load balancing and fault tolerance, when used in conjunction with retries.

They also use the airflow.models.connection.Connection model to retrieve hostnames and authentication information. Hooks keep authentication code and information out of pipelines, centralized in the metadata database.

Upvotes: 2

Related Questions