Reputation: 1713
I need to be able to access default_args defined as part of DAG definition in a Python Operator, python_callable. Maybe it's my unfamiliartiy with python or airflow in general, but could someone guide on how to achieve this.
Following is a code sample of what am trying to achieve
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': '[email protected]',
'email_on_failure': '[email protected]',
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2017, 5, 15, 23, 20),
'end_date': datetime(2017, 5, 16, 23, 45),
'touchfile_path': '/user/myname/touchfiles/',
}
dag = DAG(
'test',
default_args=default_args,
template_searchpath=['/Users/myname/Desktop/utils/airflow/resources'],
user_defined_macros=dict(SCHEMA_NAME='abc'),
#schedule_interval='*/2 * * * * ')
schedule_interval='@once')
def webhdfs_touchfile_create(ds, *args, **kwargs):
web_hdfs_hook = WebHDFSHook('webhdfs_default')
client = web_hdfs_hook.get_conn()
client.write("/user/myname/airflow_hdfs","stringToWrite")
pp.pprint(kwargs)
task1 = PythonOperator(
task_id='task1',
provide_context=True, #enabling this would allow to pass arguments automatically to your callable function
python_callable=webhdfs_touchfile_create,
templates_dict={'attr1': {{ default_args['touchfile_path'] }}},
dag=dag)
Since the template_dict for PythonOperator is the only attribute which jinja templating works, how can i retrieve the 'touchfile_path' paramter in there?
Upvotes: 9
Views: 13171
Reputation: 6412
In Airflow 2.0, TaskFlow means a "Python callable" is sometimes just a function annotated with @task. In this case, you can retrieve the default args from the context:
from airflow.operators.python import get_current_context
@task
def my_task():
context = get_current_context()
email_on_failure = context["dag"].default_args["email_on_failure"]
Upvotes: 5
Reputation: 3056
There are 2 mechanisms for passing variables in Airflow:
Using (1) approach variables can be passed via user_defined_macros
property on the DAG level.
Using (2) approach you should take a look on specific operator properties.
Note, that some operator properties are processed by Jinja and you can use template syntax.
Here is a working example:
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'custom_key1': 'custom_value1',
'custom_key2': 'custom_value2'
}
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
user_defined_macros=default_args ,
)
bash_command = """
echo "access via DAG's user_defined_macros = {{ custom_key1 }}"
echo "access via Operator's params = {{ params.custom_key2 }}"
"""
t1 = BashOperator(
task_id='print_in_bash_op',
bash_command=bash_command,
params=default_args,
dag=dag,
)
def myfunc(**context):
print(context['templates_dict']['custom_key1'])
print(context['templates_dict']['custom_key2'])
t2 = PythonOperator(
task_id='print_in_python_op',
python_callable=myfunc,
templates_dict=default_args,
provide_context=True,
dag=dag,
)
templates_dict={
'custom_key1': '{{ custom_key1 }}',
'custom_key2': '{{ custom_key2 }}'
}
t3 = PythonOperator(
task_id='print_in_python_op_2',
python_callable=myfunc,
templates_dict=templates_dict,
provide_context=True,
dag=dag,
)
t1 >> t2 >> t3
Addition based on comments
Ability of using variables is fully depends on Operator.
In (2) approach typically there are specialized properties for passing information like:
For using approach (1) such properties should be rendered using jinja in Operator code (they are marked as templated
in documentation).
For instance, properties above are templated
properties.
In every place where Airflow Macros
can be used, user variables (defined via user_defined_macros
) can be also used.
Upvotes: 6
Reputation: 472
The default args should be available in *args. Have you tried accessing touchfile_path
from there?
Upvotes: 0
Reputation: 9477
Since they are defined in the same file at the same level you can do like:
def webhdfs_touchfile_create(ds, *args, **kwargs):
web_hdfs_hook = WebHDFSHook('webhdfs_default')
client = web_hdfs_hook.get_conn()
client.write("/user/myname/airflow_hdfs","stringToWrite")
pp.pprint(kwargs)
pp.pprint(default_args['touchfile_path'])
Upvotes: 1