newtothis
newtothis

Reputation: 35

Airflow PythonOperator template_dict raises error TemplateNotFound(template)

I'm trying to pass bar.sql through the PythonOperator's template_dict for use in the python_callable, like the docs mention, but this is the closest example I've found. I've also reviewed this question which references Airflow 1.8, but the solution did not work for me in practice - I'm using Airflow 2.2.4.

(Also, there seems to be a well known BashOperator issue (question and docs references) where TemplateNotFound errors are common. For the BashOperator, you can troubleshoot by changing command='script.sh' to command='script.sh ', but I did not have any such luck using this with my .sql file passed to PythonOperator's template_dict.)

My task below is resulting in the logs raising an error TemplateNotFound(template): bar.sql

with DAG(
    'bigquery-dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    template_searchpath=['usr/local/airflow/include']
) as dag:

    start = DummyOperator(
        task_id='start',
        on_success_callback=some_other_function
    )

    t1 = PythonOperator(
        task_id='sql_printer',
        python_callable=sqlPrinter,
        templates_dict={'sql': 'bar.sql'},
        templates_exts=['.sql',],
        provide_context=True
    )

    start >> t1

My goal is for bar.sql to be available for use in sqlPrinter

-- ~/include/bar.sql
select 'hello world'
def sqlPrinter(**context):
    print(f"sql: {context['templates_dict']['sql']}")

The result I would like to see is

>>> sql: select 'hello world'

Below is the DAG and error log from sql_printer the log results. dag

[2022-04-04, 22:32:29 ] {taskinstance.py:1264} INFO - Executing <Task(PythonOperator): sql_printer> on 2022-04-05 03:32:27.076984+00:00
[2022-04-04, 22:32:29 ] {standard_task_runner.py:52} INFO - Started process 15289 to run task
[2022-04-04, 22:32:29 ] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'bigquery-dag', 'sql_printer', 'manual__2022-04-05T03:32:27.076984+00:00', '--job-id', '456', '--raw', '--subdir', 'DAGS_FOLDER/bigquery-dag.py', '--cfg-path', '/tmp/tmp0fjl_t2a', '--error-file', '/tmp/tmpuy00moli']
[2022-04-04, 22:32:29 ] {standard_task_runner.py:77} INFO - Job 456: Subtask sql_printer
[2022-04-04, 22:32:29 ] {logging_mixin.py:109} INFO - Running <TaskInstance: bigquery-dag.sql_printer manual__2022-04-05T03:32:27.076984+00:00 [running]> on host 1296ec2abf88
[2022-04-04, 22:32:30 ] {taskinstance.py:1718} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1334, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1423, in _execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2011, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1061, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1074, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1131, in render_template
    return {key: self.render_template(value, context, jinja_env) for key, value in content.items()}
  File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1131, in <dictcomp>
    return {key: self.render_template(value, context, jinja_env) for key, value in content.items()}
  File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1108, in render_template
    template = jinja_env.get_template(content)
  File "/usr/local/lib/python3.9/site-packages/jinja2/environment.py", line 997, in get_template
    return self._load_template(name, globals)
  File "/usr/local/lib/python3.9/site-packages/jinja2/environment.py", line 958, in _load_template
    template = self.loader.load(self, name, self.make_globals(globals))
  File "/usr/local/lib/python3.9/site-packages/jinja2/loaders.py", line 125, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.9/site-packages/jinja2/loaders.py", line 214, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: bar.sql

Upvotes: 0

Views: 1397

Answers (1)

Jasmine
Jasmine

Reputation: 71

try to use templates_dict={'query': 'bar.sql'}

t1 = PythonOperator(
        task_id='sql_printer',
        python_callable=sqlPrinter,
        templates_dict={'query': 'bar.sql'},
        provide_context=True
    )
def sqlPrinter(**context):
    print(f"sql: {context['templates_dict']['query']}")

the idea is came from this post

Upvotes: 0

Related Questions