Reputation: 123
I need help in passing parameters (xcom pushed from previous task), to a SQL query in a .sql file. However, I am unable to do so using the "parameters" option, even though this option is able to render xcom values from previous task. Let me know what wrong am I doing.
Thanks :)
start = EmptyOperator(
task_id="start",
)
fetch_cust_id = PythonOperator(
task_id = "fetch",
python_callable = lambda: 'C001',
)
update_orders = MySqlOperator(
task_id="update",
mysql_conn_id="mysql_default",
database="my_db",
sql="/update.sql",
parameters={
"custid": "{{ ti.xcom_pull(task_ids='fetch') }}"
}
)
start >> fetch_cust_id >> update_orders
SQL file(update.sql):
UPDATE orders
SET placed = 'yes'
WHERE
custid = {{ custid }}
;
:(
Upvotes: 2
Views: 1915
Reputation: 15981
The parameters
is used to pass "variables" to SqlAlchemy engine. In this case the rendering is not done in Airflow engine. If you want to use this you need to use SqlAlchemy syntax.
Example:
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
But in your case you want to template xcom so there is no reason to use parameters
at all. You want the rendering to be done by Airflow.
You can just set it directly in the sql
since sql is a template field:
UPDATE orders
SET placed = 'yes'
WHERE custid = "{{ ti.xcom_pull(task_ids='fetch') }}";
Upvotes: 6