Arpit Pruthi
Arpit Pruthi

Reputation: 169

Passing macros value to sql file in airflow

I have a sql file, having a sql query :-

delete from xyz where id in = 3 and time = '{{ execution_date.subtract(hours=2).strftime("%Y-%m-%d %H:%M:%S") }}';

Here I am writing macro in sql query itself, I want to pass it's value from python file where the operator is calling this sql query.

time = f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\''

I want to pass this global time variable to sql file instead of writing the complete macro there again.

PostgresOperator(dag=dag,
                 task_id='delete_entries', 
                 postgres_conn_id='database_connection',
                 sql='sql/delete_entry.sql')

if I use time in query using jinja template as {{ time }}, instead of evaluating it, it is passed as a complete string only. Please help, stuck on this for long.

Upvotes: 3

Views: 2407

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16139

Since you want to use f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\'' in two operators without duplicating the code you can define it as user macro.

from datetime import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator


def ds_macro_format(execution_date, hours):
    return execution_date.subtract(hours=hours).strftime("%Y-%m-%d %H:%M:%S")


user_macros = {
    'format': ds_macro_format
}

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 6, 7),
}

dag = DAG(
    "stackoverflow_question1",
    default_args=default_args,
    schedule_interval="@daily",
    user_defined_macros=user_macros
)

PostgresOperator(dag=dag,
                 task_id='delete_entries',
                 postgres_conn_id='database_connection',
                 sql='sql/delete_entry.sql')

and the delete_entry.sql as:

delete from xyz where id in = 3 and time = {{ format(execution_date, hours=2) }};

enter image description here

Lets say you want also to use the macro in BashOperator you can do:

BashOperator(
    task_id='bash_task',
    bash_command='echo {{ format(execution_date, hours=2) }} ',
    dag=dag,
)

enter image description here

Upvotes: 1

Related Questions