dlamblin
dlamblin

Reputation: 45321

Airflow using template files for PythonOperator

The method of getting a BashOperator or SqlOperator to pick up an external file for its template is somewhat clearly documented, but looking at the PythonOperator my test of what I understand from the docs is not working. I am not sure how the templates_exts and templates_dict parameters would correctly interact to pick up a file.

In my dags folder I've created: pyoptemplate.sql and pyoptemplate.t as well as test_python_operator_template.py:

pyoptemplate.sql:

SELECT * FROM {{params.table}};

pyoptemplate.t:

SELECT * FROM {{params.table}};

test_python_operator_template.py:

# coding: utf-8
# vim:ai:si:et:sw=4 ts=4 tw=80
"""
# A Test of Templates in PythonOperator
"""

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

import pprint

pp = pprint.PrettyPrinter(indent=4)


def templated_function(ds, **kwargs):
    """This function will try to use templates loaded from external files"""
    pp.pprint(ds)
    pp.pprint(kwargs)


# Define the DAG
dag = DAG(dag_id='test_python_operator_template_dag',
          default_args={"owner": "lamblin",
                        "start_date": datetime.now()},
          template_searchpath=['/Users/daniellamblin/airflow/dags'],
          schedule_interval='@once')


# Define the single task in this controller example DAG
op = PythonOperator(task_id='test_python_operator_template',
                    provide_context=True,
                    python_callable=templated_function,
                    templates_dict={
                        'pyoptemplate': '',
                        'pyoptemplate.sql': '',
                        'sql': 'pyoptemplate',
                        'file1':'pyoptemplate.sql',
                        'file2':'pyoptemplate.t',
                        'table': '{{params.table}}'},
                    templates_exts=['.sql','.t'],
                    params={'condition_param': True,
                            'message': 'Hello World',
                            'table': 'TEMP_TABLE'},
                    dag=dag)

The result from a run shows that table was templated correctly as a string, but the others did not pull in any files for templating.

dlamblin$ airflow test test_python_operator_template_dag test_python_operator_template 2017-01-18
[2017-01-18 23:58:06,698] {__init__.py:36} INFO - Using executor SequentialExecutor
[2017-01-18 23:58:07,342] {models.py:154} INFO - Filling up the DagBag from /Users/daniellamblin/airflow/dags
[2017-01-18 23:58:07,620] {models.py:1196} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2017-01-18 23:58:07,620] {models.py:1219} INFO - Executing <Task(PythonOperator): test_python_operator_template> on 2017-01-18 00:00:00
'2017-01-18'
{   u'END_DATE': '2017-01-18',
    u'conf': <module 'airflow.configuration' from '/Library/Python/2.7/site-packages/airflow/configuration.pyc'>,
    u'dag': <DAG: test_python_operator_template_dag>,
    u'dag_run': None,
    u'ds_nodash': u'20170118',
    u'end_date': '2017-01-18',
    u'execution_date': datetime.datetime(2017, 1, 18, 0, 0),
    u'latest_date': '2017-01-18',
    u'macros': <module 'airflow.macros' from '/Library/Python/2.7/site-packages/airflow/macros/__init__.pyc'>,
    u'params': {   'condition_param': True,
                   'message': 'Hello World',
                   'table': 'TEMP_TABLE'},
    u'run_id': None,
    u'tables': None,
    u'task': <Task(PythonOperator): test_python_operator_template>,
    u'task_instance': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
    u'task_instance_key_str': u'test_python_operator_template_dag__test_python_operator_template__20170118',
    'templates_dict': {   'file1': u'pyoptemplate.sql',
                          'file2': u'pyoptemplate.t',
                          'pyoptemplate': u'',
                          'pyoptemplate.sql': u'',
                          'sql': u'pyoptemplate',
                          'table': u'TEMP_TABLE'},
    u'test_mode': True,
    u'ti': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
    u'tomorrow_ds': '2017-01-19',
    u'tomorrow_ds_nodash': u'20170119',
    u'ts': '2017-01-18T00:00:00',
    u'ts_nodash': u'20170118T000000',
    u'yesterday_ds': '2017-01-17',
    u'yesterday_ds_nodash': u'20170117'}
[2017-01-18 23:58:07,634] {python_operator.py:67} INFO - Done. Returned value was: None

Upvotes: 25

Views: 38164

Answers (5)

lschmidt90
lschmidt90

Reputation: 368

With Airflow 2 the solution might look a bit simpler. I will also give an example of how to use this idea to fetch data from Hive (but you can replace hive with any of the databases you want) with templating and putting it directly into a pandas DataFrame. Let's assume you have next to your dag.py file a folder called sql, with this .sql-file in it:

select {{some_params}} from my_table;

Let's write a function with which you can fetch all of the .sql-files within that folder:

def get_all_sql_files():
    path = Path(f"/sql")
    return list(path.iterdir())

Usually with python you often want the data you're fetching to be a pandas DataFrame, so let's write a function for that:

def database_to_pandas(sql:str, your_conf:dict, schema="default", **context):
    hook = HiveServer2Hook(hiveserver2_conn_id="your_connection")
    hql = sql
    res = hook.get_results(hql, schema=schema, hive_conf=your_conf)
    df = pd.DataFrame(res['data'], **context)
    return df

And let's write another function which just reads the .sql-file as text and uses the function above to create a pandas DataFrame:

def get_data_from_database(query: str, your_conf:dict):
    sql = Path(query).read_text()
    return database_to_pandas(sql, your_conf)

Then all you have to do is provide the PythonOperator with two arguments, provide_context and templates_dict:

your_python_task = PythonOperator(
    task_id="select_data_from_hive",
    provide_context=True,
    templates_dict = your_settings,
    python_callable=your_func
)

Within this your_settings you can define whatever params you want to use, also Jinja templating:

your_settings = {
"YESTERDAY_NODASH": "{{ ds_nodash }}",
"some_params": "some_value",
}

In order to make your script replace everything defined in your .sql-file (like your {{some_params}}) you just need to pass the templates_dict via the context of your_func like so:

def your_func(**context):
    your_conf=context["templates_dict"]
    query_list = get_all_sql_files()
    for query in query_list:
        get_data_from_database(query, your_conf)

This will lead your .py file to replace everything in your .sql-files which you defined in your your_conf dictionary.

Upvotes: 1

P. Xie
P. Xie

Reputation: 288

Recently I came across the same issue and finally solved it. @Ardan 's solution is correct but just want to repeat with a more complete answer with some details in how Airflow works for the newcomers.

Of course you first need one of this:

from airflow.operators.python_operator import PythonOperator

class SQLTemplatedPythonOperator(PythonOperator):

    # somehow ('.sql',) doesn't work but tuple of two works...
    template_ext = ('.sql','.abcdefg')

Assuming you have a sql template file like below:

# stored at path: $AIRFLOW_HOME/sql/some.sql
select {{some_params}} from my_table;

First make sure you add your folder to the search path in your dag params.

Do not pass template_searchpath to args and then pass args to DAG!!!! It doesn't work.

dag = DAG(
    dag_id= "some_name",
    default_args=args,
    schedule_interval="@once",
    template_searchpath='/Users/your_name/some_path/airflow_home/sql'
)

Then your operator call will be

SQLTemplatedPythonOperator(
        templates_dict={'query': 'some.sql'},
        op_kwargs={"args_directly_passed_to_your_function": "some_value"},
        task_id='dummy',
        params={"some_params":"some_value"},
        python_callable=your_func,
        provide_context=True,
        dag=dag,
    )

Your function will be:

def your_func(args_directly_passed_to_your_function=None):
    query = context['templates_dict']['query']
    dome_some_thing(query)

Some explanations:

  1. Airflow uses values from the context to render your template. To manually add it to the context, you can use the params field like above.

  2. PythonOperator does not take template file extension from the template_ext field any more like @Ardan mentioned. The source code is here. It only takes extension from self.__class__.template_ext.

  3. Airflow loops through the template_dict field and if value.endswith(file_extension) == True, then it renders the template.

Upvotes: 20

Ardan
Ardan

Reputation: 666

As of Airflow 1.8, the way the PythonOperator replaces its template_ext field in __init__ doesn't work. Tasks only check template_ext on the __class__. To create a PythonOperator that picks up SQL template files you only need to do the following:

class SQLTemplatedPythonOperator(PythonOperator):
    template_ext = ('.sql',)

And then to access the SQL from your task when it runs:

SQLTemplatedPythonOperator(
    templates_dict={'query': 'my_template.sql'},
    params={'my_var': 'my_value'},
    python_callable=my_func,
    provide_context=True,
)

def my_func(**context):
    context['templates_dict']['query']

Upvotes: 25

Saurabh Mishra
Saurabh Mishra

Reputation: 1713

Unable to get a script file templated in python to work (new to python). But an example with bash operator is following, maybe that can give you some hints

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    #'start_date': airflow.utils.dates.days_ago(2),
    'email': ['[email protected]']}

dag = DAG('sr5', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20),
          catchup=False, #so that on scehduler restart, it doesn't try to catchup on all the missed runs
          template_searchpath=['/Users/my_name/Desktop/utils/airflow/resources'])

t1 = BashOperator(
    task_id='t1',
    depends_on_past=False,
    params={
        'ds1': 'hie'},
    bash_command="01.sh",
    dag=dag)

the 01.sh script looks like follows

#!/bin/sh

echo {{ ds }}
echo {{ params.ds1 }}

This give an output as follows on test execution

[2017-05-12 08:31:52,981] {bash_operator.py:91} INFO - Output:

[2017-05-12 08:31:52,984] {bash_operator.py:95} INFO - 2017-05-05

[2017-05-12 08:31:52,984] {bash_operator.py:95} INFO - hie

Upvotes: 1

Will Fitzgerald
Will Fitzgerald

Reputation: 1382

I don't think this is really possible. But the following workaround might be helpful:

def templated_function(ds, **kwargs):
    kwargs['ds'] = ds                                # put ds into 'context'
    task = kwargs['task']                            # get handle on task
    templ = open(kwargs['templates_dict']['file1']).read() # get template
    sql = task.render_template('', tmpl, kwargs)           # render it
    pp.pprint(sql)

Would love a better solution, though!

Upvotes: 12

Related Questions