Reputation: 35
I'm trying to pass bar.sql
through the PythonOperator's template_dict for use in the python_callable, like the docs mention, but this is the closest example I've found. I've also reviewed this question which references Airflow 1.8, but the solution did not work for me in practice - I'm using Airflow 2.2.4.
(Also, there seems to be a well known BashOperator issue (question and docs references) where TemplateNotFound errors are common. For the BashOperator, you can troubleshoot by changing command='script.sh'
to command='script.sh '
, but I did not have any such luck using this with my .sql file passed to PythonOperator's template_dict.)
My task below is resulting in the logs raising an error TemplateNotFound(template): bar.sql
with DAG(
'bigquery-dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
template_searchpath=['usr/local/airflow/include']
) as dag:
start = DummyOperator(
task_id='start',
on_success_callback=some_other_function
)
t1 = PythonOperator(
task_id='sql_printer',
python_callable=sqlPrinter,
templates_dict={'sql': 'bar.sql'},
templates_exts=['.sql',],
provide_context=True
)
start >> t1
My goal is for bar.sql
to be available for use in sqlPrinter
-- ~/include/bar.sql
select 'hello world'
def sqlPrinter(**context):
print(f"sql: {context['templates_dict']['sql']}")
The result I would like to see is
>>> sql: select 'hello world'
Below is the DAG and error log from sql_printer
the log results.
[2022-04-04, 22:32:29 ] {taskinstance.py:1264} INFO - Executing <Task(PythonOperator): sql_printer> on 2022-04-05 03:32:27.076984+00:00
[2022-04-04, 22:32:29 ] {standard_task_runner.py:52} INFO - Started process 15289 to run task
[2022-04-04, 22:32:29 ] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'bigquery-dag', 'sql_printer', 'manual__2022-04-05T03:32:27.076984+00:00', '--job-id', '456', '--raw', '--subdir', 'DAGS_FOLDER/bigquery-dag.py', '--cfg-path', '/tmp/tmp0fjl_t2a', '--error-file', '/tmp/tmpuy00moli']
[2022-04-04, 22:32:29 ] {standard_task_runner.py:77} INFO - Job 456: Subtask sql_printer
[2022-04-04, 22:32:29 ] {logging_mixin.py:109} INFO - Running <TaskInstance: bigquery-dag.sql_printer manual__2022-04-05T03:32:27.076984+00:00 [running]> on host 1296ec2abf88
[2022-04-04, 22:32:30 ] {taskinstance.py:1718} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1334, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1423, in _execute_task_with_callbacks
self.render_templates(context=context)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2011, in render_templates
self.task.render_template_fields(context)
File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1061, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1074, in _do_render_template_fields
rendered_content = self.render_template(content, context, jinja_env, seen_oids)
File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1131, in render_template
return {key: self.render_template(value, context, jinja_env) for key, value in content.items()}
File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1131, in <dictcomp>
return {key: self.render_template(value, context, jinja_env) for key, value in content.items()}
File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1108, in render_template
template = jinja_env.get_template(content)
File "/usr/local/lib/python3.9/site-packages/jinja2/environment.py", line 997, in get_template
return self._load_template(name, globals)
File "/usr/local/lib/python3.9/site-packages/jinja2/environment.py", line 958, in _load_template
template = self.loader.load(self, name, self.make_globals(globals))
File "/usr/local/lib/python3.9/site-packages/jinja2/loaders.py", line 125, in load
source, filename, uptodate = self.get_source(environment, name)
File "/usr/local/lib/python3.9/site-packages/jinja2/loaders.py", line 214, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: bar.sql
Upvotes: 0
Views: 1397
Reputation: 71
try to use templates_dict={'query': 'bar.sql'}
t1 = PythonOperator(
task_id='sql_printer',
python_callable=sqlPrinter,
templates_dict={'query': 'bar.sql'},
provide_context=True
)
def sqlPrinter(**context):
print(f"sql: {context['templates_dict']['query']}")
the idea is came from this post
Upvotes: 0