Reputation: 25
I'm writing a DAG on Airflow with the following structure:
This is the DAG:
import pendulum, os, logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import TaskInstance
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime
def catch_output(ti, **kwargs):
query_result=ti.xcom_pull(task_ids="query_task")
logging.info(f"{query_result}")
with DAG(
'dag_name',
schedule=None,
start_date=pendulum.datetime(2025,2,25, tz='UTC'),
max_active_runs=1,
catchup=False,
default_args={...}
) as dag:
query_task = SQLExecuteQueryOperator(
task_id='query_task',
conn_id='xyz',
show_return_value_in_logs=True,
split_statement=True,
sql='SELECT COUNT(*) FROM TABLE',
return_last=True,
do_xcom_push=True
)
catch_output = PythonOperator(
task_id='catch_output',
python_callable=catch_output,
dag=dag
)
query_task >> catch_output
The first task correctly works and the output is logged on Airflow. But the second task, despite its finishing with success, doesn't log anything. The variable query_result has value None. I think the reason is that nothing i pushed with xcom by the task that runs the query on Starburst, but I don't know how to solve this problem.
Important information: I cannot connect to the database except with SQLExecuteQueryOperator (at least at the moment).
Upvotes: 0
Views: 57