ennezetaqu
ennezetaqu

Reputation: 25

How to retrieve the output of a SQL query executed with SQLExecuteQueryOperator in Airflow

I'm writing a DAG on Airflow with the following structure:

  1. Execute a simple SELECT COUNT(*) on a Starburst database with SQLExecuteQueryOperator;
  2. Retrieve the result (which should be 0 or 1) and use it to check whether a certain record is in the table or not.

This is the DAG:

import pendulum, os, logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import TaskInstance
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime

def catch_output(ti, **kwargs):
    query_result=ti.xcom_pull(task_ids="query_task")
    logging.info(f"{query_result}")

with DAG(
   'dag_name',
   schedule=None,
   start_date=pendulum.datetime(2025,2,25, tz='UTC'),
   max_active_runs=1,
   catchup=False,
   default_args={...}
) as dag:

query_task = SQLExecuteQueryOperator(
    task_id='query_task',
    conn_id='xyz',
    show_return_value_in_logs=True,
    split_statement=True,
    sql='SELECT COUNT(*) FROM TABLE',
    return_last=True,
    do_xcom_push=True
)

catch_output = PythonOperator(
    task_id='catch_output',
    python_callable=catch_output,
    dag=dag
)

query_task >> catch_output 

The first task correctly works and the output is logged on Airflow. But the second task, despite its finishing with success, doesn't log anything. The variable query_result has value None. I think the reason is that nothing i pushed with xcom by the task that runs the query on Starburst, but I don't know how to solve this problem.

Important information: I cannot connect to the database except with SQLExecuteQueryOperator (at least at the moment).

Upvotes: 0

Views: 57

Answers (0)

Related Questions