Reputation: 632
I am trying to pass the params in postgres operator, in a dynamic way.
There are two tasks in order to refresh the metadata,
get list of id (get_query_id_task)
pass the list of ids to get and execute the query ( get_query_text_task)
get_query_id_task = PythonOperator(
task_id='get_query_id',
python_callable=query_and_push,
#provide_context=True,
op_kwargs={
'sql' : read_sql('warmupqueryid.sql')
}
)
get_query_text_task= PostgresOperator(
task_id='get_query_text',
postgres_conn_id='redshift',
trigger_rule=TriggerRule.ALL_DONE,
params={'query_ids': " {{ ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }}"},
sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
FROM stl_querytext
WHERE query in {{ macros.custom_macros.render_list_sql(params.query_ids) }};""",
)
Xcom push return the list of queries as below:
[(19343160,), (19350561,), (19351381,), (19351978,), (19356674,), (19356676,), (19356678,), (19356681,), (19356682,), (19359607,)]
I have used plugin to render the xcom push:
def render_list_sql(li):
l = []
for index, tup in enumerate(li):
idd = tup[0]
l.append(idd)
return tuple(l)
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "custom_macros"
macros = [render_list_sql]
Template render from the first task:
Template rendered doesn't pass the parameter
Xcom push value, is list of tuples
The problem has been solved using the provided solution in below.However I couldn't make a loop over the list of ids. So it's just showing me one id. and I am not sure how to do loop over the ids.
Here is the log:
*** Reading remote log from s3://ob-airflow-pre/logs/Redshift_warm-up/get_query_text/2021-05-27T20:31:05.036338+00:00/1.log.
[2021-05-27 20:31:07,435] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [queued]>
[2021-05-27 20:31:07,463] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [queued]>
[2021-05-27 20:31:07,463] {taskinstance.py:1068} INFO -
--------------------------------------------------------------------------------
[2021-05-27 20:31:07,463] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2021-05-27 20:31:07,463] {taskinstance.py:1070} INFO -
--------------------------------------------------------------------------------
[2021-05-27 20:31:07,473] {taskinstance.py:1089} INFO - Executing <Task(PostgresOperator): get_query_text> on 2021-05-27T20:31:05.036338+00:00
[2021-05-27 20:31:07,476] {standard_task_runner.py:52} INFO - Started process 384 to run task
[2021-05-27 20:31:07,479] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'Redshift_warm-up', 'get_query_text', '2021-05-27T20:31:05.036338+00:00', '--job-id', '2045', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/redshift_warm-up_dag.py', '--cfg-path', '/tmp/tmp8n32exly', '--error-file', '/tmp/tmp0bdhn3lj']
[2021-05-27 20:31:07,479] {standard_task_runner.py:77} INFO - Job 2045: Subtask get_query_text
[2021-05-27 20:31:07,645] {logging_mixin.py:104} INFO - Running <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow.svc.cluster.local
[2021-05-27 20:31:07,747] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=Redshift_warm-up
AIRFLOW_CTX_TASK_ID=get_query_text
AIRFLOW_CTX_EXECUTION_DATE=2021-05-27T20:31:05.036338+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-27T20:31:05.036338+00:00
[2021-05-27 20:31:07,748] {postgres.py:69} INFO - Executing: SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
FROM stl_querytext
WHERE query in (19343160);
[2021-05-27 20:31:07,767] {base.py:69} INFO - Using connection to: id: redshift. Host: sys-redshift-pre.oneboxtickets.net, Port: 5439, Schema: reports, Login: mstr_new, Password: XXXXXXXX, extra: XXXXXXXX
[2021-05-27 20:31:07,792] {dbapi.py:180} INFO - Running statement: SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
FROM stl_querytext
WHERE query in (19343160);, parameters: None
[2021-05-27 20:31:08,727] {dbapi.py:186} INFO - Rows affected: 1
[2021-05-27 20:31:08,759] {taskinstance.py:1185} INFO - Marking task as SUCCESS. dag_id=Redshift_warm-up, task_id=get_query_text, execution_date=20210527T203105, start_date=20210527T203107, end_date=20210527T203108
[2021-05-27 20:31:08,806] {taskinstance.py:1246} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-05-27 20:31:08,814] {local_task_job.py:146} INFO - Task exited with return code 0
Upvotes: 1
Views: 2392
Reputation: 18844
params
argument is not "Templated", so it would only render strings. So move your param
directly to SQL
get_query_text_task= PostgresOperator(
task_id='get_query_text',
postgres_conn_id='redshift',
trigger_rule=TriggerRule.ALL_DONE,
sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
FROM stl_querytext
WHERE query in ({{ macros.custom_macros.render_list_sql( ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }});""",
)
Upvotes: 4