Reputation: 1266
I am trying to run a SQL script from Airflow. It is failing due to a Template error.
The script is basically trying a run a sql from Athena and load into Redshift table.
SQl is placed at: redshift/sql/public/flow/KN_AWS_RE_ShipmentData_dup_update.sql
My Airflow Code
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('sample_sql', default_args=default_args, schedule_interval='0 4 * * *')
execute_notebook = PostgresOperator(
task_id='sample_sql',
postgres_conn_id='REDSHIFT_CONN',
sql="redshift/sql/public/flow/KN_AWS_RE_ShipmentData_dup_update.sql",
params={'limit': '50'},
dag=dag
)
Error
[2020-04-14 02:19:24,412] {{standard_task_runner.py:52}} INFO - Started process 23012 to run task
[2020-04-14 02:19:24,482] {{logging_mixin.py:112}} INFO - [2020-04-14 02:19:24,481] {{dagbag.py:403}} INFO - Filling up the DagBag from /usr/local/airflow/dags/Scripts/Sample.py
[2020-04-14 02:19:24,495] {{baseoperator.py:807}} ERROR - KN_AWS_RE_ShipmentData_dup_update.sql
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 805, in resolve_template_files
setattr(self, field, env.loader.get_source(env, content)[0])
File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 187, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: KN_AWS_RE_ShipmentData_dup_update.sql
[2020-04-14 02:19:24,545] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: sample_sql.sample_sql 2020-04-14T02:14:08.020072+00:00 [running]> 0ca54c719ff7
[2020-04-14 02:19:24,583] {{taskinstance.py:1088}} ERROR - KN_AWS_RE_ShipmentData_dup_update.sql
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 940, in _run_raw_task
self.render_templates(context=context)
How do I fix this issue?
Upvotes: 0
Views: 3086
Reputation: 1758
You have to specify the path to the .sql
template file at the instantiation of the DAG, with the variable template_searchpath
. By default Jinja will look into your DAG folder.
Note that your DAG contains one bad practice, that is having a start_date
that is dynamic. The start_date
should be fixed (i.e. datetime(2020,4,13)
) instead of dynamic (i.e. datetime.now()
). You can read more about it here.
This said I would try to change the DAG definition to this:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
# Remove this
# yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020,4,13), # Change this
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'sample_sql',
default_args=default_args,
schedule_interval='0 4 * * *',
template_searchpath='/redshift/sql/public/flow/')
execute_notebook = PostgresOperator(
task_id='sample_sql',
postgres_conn_id='REDSHIFT_CONN',
sql='KN_AWS_RE_ShipmentData_dup_update.sql',
params={'limit': '50'},
dag=dag
)
execute_notebook # Tell airflow the tasks dependencies, in this case no dependency
Of course you should choose the correct absolute base path to assign to template_searchpath
, so something like /home/redshift/sql/public/flow
.
Upvotes: 2