Reputation: 169
I have a sql file, having a sql query :-
delete from xyz where id in = 3 and time = '{{ execution_date.subtract(hours=2).strftime("%Y-%m-%d %H:%M:%S") }}';
Here I am writing macro in sql query itself, I want to pass it's value from python file where the operator is calling this sql query.
time = f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\''
I want to pass this global time variable to sql file instead of writing the complete macro there again.
PostgresOperator(dag=dag,
task_id='delete_entries',
postgres_conn_id='database_connection',
sql='sql/delete_entry.sql')
if I use time
in query using jinja template as {{ time }}
, instead of evaluating it, it is passed as a complete string only.
Please help, stuck on this for long.
Upvotes: 3
Views: 2407
Reputation: 16139
Since you want to use f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\''
in two operators without duplicating the code you can define it as user macro.
from datetime import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
def ds_macro_format(execution_date, hours):
return execution_date.subtract(hours=hours).strftime("%Y-%m-%d %H:%M:%S")
user_macros = {
'format': ds_macro_format
}
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 6, 7),
}
dag = DAG(
"stackoverflow_question1",
default_args=default_args,
schedule_interval="@daily",
user_defined_macros=user_macros
)
PostgresOperator(dag=dag,
task_id='delete_entries',
postgres_conn_id='database_connection',
sql='sql/delete_entry.sql')
and the delete_entry.sql
as:
delete from xyz where id in = 3 and time = {{ format(execution_date, hours=2) }};
Lets say you want also to use the macro in BashOperator you can do:
BashOperator(
task_id='bash_task',
bash_command='echo {{ format(execution_date, hours=2) }} ',
dag=dag,
)
Upvotes: 1