sarit kumar
sarit kumar

Reputation: 123

How to render a .sql file with parameters in MySqlOperator in Airflow?

I need help in passing parameters (xcom pushed from previous task), to a SQL query in a .sql file. However, I am unable to do so using the "parameters" option, even though this option is able to render xcom values from previous task. Let me know what wrong am I doing.

Thanks :)

start = EmptyOperator(
            task_id="start",
    )

fetch_cust_id = PythonOperator(
    task_id = "fetch",
    python_callable = lambda: 'C001',
)

update_orders = MySqlOperator(
    task_id="update",
    mysql_conn_id="mysql_default",
    database="my_db",
    sql="/update.sql",
    parameters={
        "custid": "{{ ti.xcom_pull(task_ids='fetch') }}"
    }
)

start >> fetch_cust_id >> update_orders

SQL file(update.sql):

UPDATE orders
SET placed = 'yes'
WHERE
custid = {{ custid }}
;

:(

enter image description here

Upvotes: 2

Views: 1915

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15981

The parameters is used to pass "variables" to SqlAlchemy engine. In this case the rendering is not done in Airflow engine. If you want to use this you need to use SqlAlchemy syntax. Example:

sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},

But in your case you want to template xcom so there is no reason to use parameters at all. You want the rendering to be done by Airflow.

You can just set it directly in the sql since sql is a template field:

UPDATE orders
SET placed = 'yes'
WHERE custid = "{{ ti.xcom_pull(task_ids='fetch') }}";

Upvotes: 6

Related Questions