Reputation: 11
I'm new to airflow, can someone please help me with this as I'm unable to access 'db_conn' inside my custom operator, this argument defined in default_args.
**Dag details:**
default_args = {
'owner': 'airflow',
'email': ['[email protected]'],
'db_conn': 'database_connection'
}
dag = DAG(dag_id='my_custom_operator_dag', description='Another tutorial DAG',
schedule_interval='0 12 * * *',
start_date=datetime(2020, 8, 6),
catchup=False,
default_args=default_args)
operator_task_start = MyOperator(
task_id='my_custom_operator', dag=dag
)
**Operator details:**
class MyOperator(BaseOperator):
#@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MyOperator, self).__init__(*args,**kwargs)
def execute(self, context):
log.info('owner: %s', self.owner)
log.info('email: %s', self.email)
log.info('db_conn: %s', self.db_conn)
# Error here, AttributeError: 'MyOperator' object has no attribute 'db_conn
Upvotes: 1
Views: 1625
Reputation: 11597
You seem to have misunderstood default_args
. default_args
is just a shorthand (code-cleanup / refactoring / brevity) to pass common (which have same value for all operators of DAG, like owner
) args to all your operator
s, by setting them up as defaults and passing to the DAG
itself. Quoting the docstring comment from DAG
params
:param default_args: A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains `'depends_on_past': True` here and `'depends_on_past': False` in the operator's call `default_args`, the actual value will be `False`. :type default_args: dict
So clearly for default_args
to work, any key
s that you are passing there should be an argument of your Operator
class
es.
Not just that, do note that passing invalid (non-existent) arguments to Operator
constructor(s) will be penalized in Airflow 2.0 (so better not pass any)
'Invalid arguments were passed to {c} (task_id: {t}). ' 'Support for passing such arguments will be dropped in ' 'future. ..
Hopefully, by now it must be clear that to make this work, you must add a param db_conn
in constructor of your MyOperator
class
**Operator details:**
class MyOperator(BaseOperator):
#@apply_defaults
def __init__(self,
db_conn: str,
*args,
**kwargs):
super(MyOperator, self).__init__(*args,**kwargs)
self.db_conn: str = db_conn
And while we are at it, may I offer you a suggestion: for something like a connection, preferably use the Connection
feature offered by Airflow which eases your interaction with external services
conn_id
, Airflow will randomly distribute calls to one of those)When there is more than one connection with the same conn_id, the get_connection() method on BaseHook will choose one connection randomly. This can be be used to provide basic load balancing and fault tolerance, when used in conjunction with retries.
They also use the airflow.models.connection.Connection model to retrieve hostnames and authentication information. Hooks keep authentication code and information out of pipelines, centralized in the metadata database.
Upvotes: 2