pm1359
pm1359

Reputation: 632

Airflow 2.0.2 How to pass parameter within postgres tasks using xcom?

I am trying to pass the params in postgres operator, in a dynamic way.

There are two tasks in order to refresh the metadata,

  1. get list of id (get_query_id_task)

  2. pass the list of ids to get and execute the query ( get_query_text_task)

    get_query_id_task = PythonOperator(
         task_id='get_query_id',
         python_callable=query_and_push,
         #provide_context=True,
         op_kwargs={
             'sql' : read_sql('warmupqueryid.sql')
                             }
                         )
    
    
     get_query_text_task= PostgresOperator(
         task_id='get_query_text',
         postgres_conn_id='redshift',
         trigger_rule=TriggerRule.ALL_DONE,
         params={'query_ids': " {{ ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }}"},
         sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
                FROM stl_querytext
                WHERE query in {{ macros.custom_macros.render_list_sql(params.query_ids) }};""",
         )
    

Xcom push return the list of queries as below:

[(19343160,), (19350561,), (19351381,), (19351978,), (19356674,), (19356676,), (19356678,), (19356681,), (19356682,), (19359607,)]

I have used plugin to render the xcom push:

def render_list_sql(li):
l = []
for index, tup in enumerate(li):  
    idd = tup[0]
    l.append(idd)
return tuple(l)    
    

# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "custom_macros"
    macros = [render_list_sql]

Template render from the first task: enter image description here

Template rendered doesn't pass the parameter enter image description here

Xcom push value, is list of tuples enter image description here

The problem has been solved using the provided solution in below.However I couldn't make a loop over the list of ids. So it's just showing me one id. and I am not sure how to do loop over the ids.

Here is the log:

*** Reading remote log from s3://ob-airflow-pre/logs/Redshift_warm-up/get_query_text/2021-05-27T20:31:05.036338+00:00/1.log.
[2021-05-27 20:31:07,435] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [queued]>
[2021-05-27 20:31:07,463] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [queued]>
[2021-05-27 20:31:07,463] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-05-27 20:31:07,463] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2021-05-27 20:31:07,463] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-05-27 20:31:07,473] {taskinstance.py:1089} INFO - Executing <Task(PostgresOperator): get_query_text> on 2021-05-27T20:31:05.036338+00:00
[2021-05-27 20:31:07,476] {standard_task_runner.py:52} INFO - Started process 384 to run task
[2021-05-27 20:31:07,479] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'Redshift_warm-up', 'get_query_text', '2021-05-27T20:31:05.036338+00:00', '--job-id', '2045', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/redshift_warm-up_dag.py', '--cfg-path', '/tmp/tmp8n32exly', '--error-file', '/tmp/tmp0bdhn3lj']
[2021-05-27 20:31:07,479] {standard_task_runner.py:77} INFO - Job 2045: Subtask get_query_text
[2021-05-27 20:31:07,645] {logging_mixin.py:104} INFO - Running <TaskInstance: Redshift_warm-up.get_query_text 2021-05-27T20:31:05.036338+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow.svc.cluster.local
[2021-05-27 20:31:07,747] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=Redshift_warm-up
AIRFLOW_CTX_TASK_ID=get_query_text
AIRFLOW_CTX_EXECUTION_DATE=2021-05-27T20:31:05.036338+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-27T20:31:05.036338+00:00
[2021-05-27 20:31:07,748] {postgres.py:69} INFO - Executing: SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
               FROM stl_querytext
               WHERE query in (19343160);
[2021-05-27 20:31:07,767] {base.py:69} INFO - Using connection to: id: redshift. Host: sys-redshift-pre.oneboxtickets.net, Port: 5439, Schema: reports, Login: mstr_new, Password: XXXXXXXX, extra: XXXXXXXX
[2021-05-27 20:31:07,792] {dbapi.py:180} INFO - Running statement: SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
               FROM stl_querytext
               WHERE query in (19343160);, parameters: None
[2021-05-27 20:31:08,727] {dbapi.py:186} INFO - Rows affected: 1
[2021-05-27 20:31:08,759] {taskinstance.py:1185} INFO - Marking task as SUCCESS. dag_id=Redshift_warm-up, task_id=get_query_text, execution_date=20210527T203105, start_date=20210527T203107, end_date=20210527T203108
[2021-05-27 20:31:08,806] {taskinstance.py:1246} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-05-27 20:31:08,814] {local_task_job.py:146} INFO - Task exited with return code 0

Upvotes: 1

Views: 2392

Answers (1)

kaxil
kaxil

Reputation: 18844

params argument is not "Templated", so it would only render strings. So move your param directly to SQL

    get_query_text_task= PostgresOperator(
        task_id='get_query_text',
        postgres_conn_id='redshift',
        trigger_rule=TriggerRule.ALL_DONE,
        sql="""SELECT LISTAGG(CASE WHEN LEN (RTRIM(TEXT)) = 0 THEN TEXT ELSE RTRIM(TEXT) END,'') within group(ORDER BY SEQUENCE) AS TEXT
               FROM stl_querytext
               WHERE query in ({{ macros.custom_macros.render_list_sql( ti.xcom_pull(task_ids='get_query_id_task', key='return_value') }});""",
        )

Upvotes: 4

Related Questions