Saurabh Mishra
Saurabh Mishra

Reputation: 1713

How to retrieve default args in python callable

I need to be able to access default_args defined as part of DAG definition in a Python Operator, python_callable. Maybe it's my unfamiliartiy with python or airflow in general, but could someone guide on how to achieve this.

Following is a code sample of what am trying to achieve

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': '[email protected]',
    'email_on_failure': '[email protected]',
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2017, 5, 15, 23, 20),
    'end_date': datetime(2017, 5, 16, 23, 45),
    'touchfile_path': '/user/myname/touchfiles/',
}

dag = DAG(
    'test',
    default_args=default_args,
    template_searchpath=['/Users/myname/Desktop/utils/airflow/resources'],
    user_defined_macros=dict(SCHEMA_NAME='abc'),
    #schedule_interval='*/2 * * * * ')
    schedule_interval='@once')

def webhdfs_touchfile_create(ds, *args, **kwargs):
    web_hdfs_hook = WebHDFSHook('webhdfs_default')
    client = web_hdfs_hook.get_conn()
    client.write("/user/myname/airflow_hdfs","stringToWrite")
    pp.pprint(kwargs)

task1 = PythonOperator(
    task_id='task1',
    provide_context=True, #enabling this would allow to pass arguments automatically to your callable function
    python_callable=webhdfs_touchfile_create,
    templates_dict={'attr1': {{ default_args['touchfile_path'] }}},
    dag=dag)

Since the template_dict for PythonOperator is the only attribute which jinja templating works, how can i retrieve the 'touchfile_path' paramter in there?

Upvotes: 9

Views: 13171

Answers (4)

Noumenon
Noumenon

Reputation: 6412

In Airflow 2.0, TaskFlow means a "Python callable" is sometimes just a function annotated with @task. In this case, you can retrieve the default args from the context:

from airflow.operators.python import get_current_context

    @task
    def my_task():
        context = get_current_context()
        email_on_failure = context["dag"].default_args["email_on_failure"]

Upvotes: 5

Ilya Bystrov
Ilya Bystrov

Reputation: 3056

There are 2 mechanisms for passing variables in Airflow:

  • (1) Jinja templating
  • (2) Specialized operator properties

Using (1) approach variables can be passed via user_defined_macros property on the DAG level. Using (2) approach you should take a look on specific operator properties.

Note, that some operator properties are processed by Jinja and you can use template syntax.

Here is a working example:

from datetime import timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'custom_key1': 'custom_value1',
    'custom_key2': 'custom_value2'
}

dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    user_defined_macros=default_args ,
)

bash_command = """
    echo "access via DAG's user_defined_macros = {{ custom_key1 }}"
    echo "access via Operator's params = {{ params.custom_key2 }}"
"""

t1 = BashOperator(
    task_id='print_in_bash_op',
    bash_command=bash_command,
    params=default_args,
    dag=dag,
)

def myfunc(**context):
    print(context['templates_dict']['custom_key1'])
    print(context['templates_dict']['custom_key2'])


t2 = PythonOperator(
    task_id='print_in_python_op',
    python_callable=myfunc, 
    templates_dict=default_args,
    provide_context=True,
    dag=dag,
)

templates_dict={
    'custom_key1': '{{ custom_key1 }}',
    'custom_key2': '{{ custom_key2 }}'
}

t3 = PythonOperator(
    task_id='print_in_python_op_2',
    python_callable=myfunc, 
    templates_dict=templates_dict,
    provide_context=True,
    dag=dag,
)

t1 >> t2 >> t3

Addition based on comments

Ability of using variables is fully depends on Operator.

In (2) approach typically there are specialized properties for passing information like:

  • bash_command in BashOperator,
  • op_kwargs in PythonOperator,
  • sql in BigQueryOperator

For using approach (1) such properties should be rendered using jinja in Operator code (they are marked as templated in documentation). For instance, properties above are templated properties.

In every place where Airflow Macros can be used, user variables (defined via user_defined_macros) can be also used.

Upvotes: 6

genhernandez
genhernandez

Reputation: 472

The default args should be available in *args. Have you tried accessing touchfile_path from there?

Upvotes: 0

jhnclvr
jhnclvr

Reputation: 9477

Since they are defined in the same file at the same level you can do like:

def webhdfs_touchfile_create(ds, *args, **kwargs):
    web_hdfs_hook = WebHDFSHook('webhdfs_default')
    client = web_hdfs_hook.get_conn()
    client.write("/user/myname/airflow_hdfs","stringToWrite")
    pp.pprint(kwargs) 
    pp.pprint(default_args['touchfile_path'])

Upvotes: 1

Related Questions