Samuel Gottipalli
Samuel Gottipalli

Reputation: 81

Airflow 2.6.2 - ti.xcom_pull() is not retrieving any data

I have a function in a DAG file to convert MSSQL and Oracle Operator queried data into a pandas dataframe and ti.xcom_pull(task_ids=task_id) pulls None. The XCom itself when checked in Airflow webserver(GUI) has the correct key:value data as expected.

Please help.

Here's what the script looks like:

DAG_ID = "example_db_providers"


# Define the function to create DataFrame
def create_dataframe(task_id, database_type, connection_id, table_name=None, **context):
    print("context:", context)
    print("task_id:", task_id)
    print("database:", database_type)
    print("table_name:", table_name)
    ti = context["ti"]
    print("ti:", ti)
    xcom_data = ti.xcom_pull(task_ids=task_id)  # Retrieve XCOMs from query_task
    print(xcom_data)
    if database_type == "mssql":
        hook = MsSqlHook(mssql_conn_id=connection_id)
        column_names = hook.get_conn().cursor().description

        if column_names:
            column_names = [col[0] for col in column_names]  # Extract column names
            df = DataFrame(data=xcom_data, columns=column_names)

        else:
            # If metadata retrieval fails, handle gracefully
            print("Warning: Unable to retrieve column names from metadata.")
            df = DataFrame(data=xcom_data)  # Create DataFrame without column names

            # Optionally, set column names manually if known
    elif database_type == "oracle":
        # Attempt to retrieve column names from metadata using OracleHook
        hook = OracleHook(oracle_conn_id=connection_id)
        try:
            column_names = hook.get_records(
                sql=f"SELECT * FROM {table_name} WHERE ROWNUM <= 0"
            )[
                0
            ].keys()  # Fetch column names
            df = DataFrame(data=xcom_data, columns=column_names)

        except Exception as e:
            # If metadata retrieval fails, handle gracefully
            print("Warning: Unable to retrieve column names from metadata.")
            print(f"Error: {e}")
            df = DataFrame(data=xcom_data)  # Create DataFrame without column names

            # Optionally, set column names manually if known

    # Perform any additional operations on the DataFrame
    # ...

    print(df.head())  # Example: Print the first 5 rows


with DAG(
    dag_id=DAG_ID,
    schedule="@once",
    start_date=datetime(year=2021, month=10, day=1),
    tags=["example"],
    catchup=False,
) as dag:
    # MSSQL task group
    with TaskGroup(group_id="mssql_tasks") as mssql_tasks:
        get_gender = MsSqlOperator(
            task_id="get_gender",
            mssql_conn_id="ODS-EDWDEV-NEW",
            sql=r"""SELECT * FROM [edw_landing].[STDNT].[D_GENDER]""",
            dag=dag,
        )

        def print_table(**context):
            ti = context["ti"]
            xcom_data = ti.xcom_pull(
                task_id=get_gender
            )  # Retrieve XCOMs from query_task
            print(xcom_data)

        test_print = PythonOperator(
            task_id="test_print",
            python_callable=print_table,
            provide_context=True,
            dag=dag,
        )

        sql_dataframe_task = PythonOperator(
            task_id="create_dataframe_for_sql",
            python_callable=create_dataframe,
            op_kwargs={
                "task_id": "get_gender",
                "database_type": "mssql",
                "connection_id": "ODS-EDWDEV-NEW",
                "table_name": "[STDNT].[D_GENDER]",
            },  # Pass required arguments
            provide_context=True,
            dag=dag,
        )
        # get_gender >> test_print >> sql_dataframe_task
        # get_gender >> sql_dataframe_task
        # get_gender >> test_print

        mssql_tasks

I tried various combinations of task and taskgroup dependencies to check if it would make a difference. I also attempted to access the same data within the print_table function within the taskgroup to see if the xcom was not accessible outside of the DAG, but got the same result.

Here's what my Log looks like:

***   * /home/airflow/logs/dag_id=example_db_providers/run_id=manual__2024-01-06T01:20:55.776122+00:00/task_id=mssql_tasks.create_dataframe_for_sql/attempt=1.log
[2024-01-05, 17:21:06 PST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [queued]>
[2024-01-05, 17:21:06 PST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [queued]>
[2024-01-05, 17:21:06 PST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2024-01-05, 17:21:06 PST] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): mssql_tasks.create_dataframe_for_sql> on 2024-01-06 01:20:55.776122+00:00
[2024-01-05, 17:21:06 PST] {standard_task_runner.py:57} INFO - Started process 129238 to run task
[2024-01-05, 17:21:06 PST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'example_db_providers', 'mssql_tasks.create_dataframe_for_sql', 'manual__2024-01-06T01:20:55.776122+00:00', '--job-id', '386', '--raw', '--subdir', 'DAGS_FOLDER/test_dags/test_provider.py', '--cfg-path', '/tmp/tmpuyc1ib5t']
[2024-01-05, 17:21:06 PST] {standard_task_runner.py:85} INFO - Job 386: Subtask mssql_tasks.create_dataframe_for_sql
[2024-01-05, 17:21:06 PST] {task_command.py:410} INFO - Running <TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [running]> on host ods-etldev-app
[2024-01-05, 17:21:06 PST] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_db_providers' AIRFLOW_CTX_TASK_ID='mssql_tasks.create_dataframe_for_sql' AIRFLOW_CTX_EXECUTION_DATE='2024-01-06T01:20:55.776122+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-01-06T01:20:55.776122+00:00'
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - context: {'conf': <***.configuration.AirflowConfigParser object at 0x7f8446e85900>, 'dag': <DAG: example_db_providers>, 'dag_run': <DagRun example_db_providers @ 2024-01-06 01:20:55.776122+00:00: manual__2024-01-06T01:20:55.776122+00:00, state:running, queued_at: 2024-01-06 01:20:55.782080+00:00. externally triggered: True>, 'data_interval_end': DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')), 'ds': '2024-01-06', 'ds_nodash': '20240106', 'execution_date': <Proxy at 0x7f842cf440c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'execution_date', DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')))>, 'expanded_ti_count': None, 'inlets': [], 'logical_date': DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')), 'macros': <module '***.macros' from '/home/***/env/lib/python3.10/site-packages/***/macros/__init__.py'>, 'next_ds': <Proxy at 0x7f842d227e40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'next_ds', '2024-01-06')>, 'next_ds_nodash': <Proxy at 0x7f842cf734c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'next_ds_nodash', '20240106')>, 'next_execution_date': <Proxy at 0x7f842cf72500 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'next_execution_date', DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')))>, 'outlets': [], 'params': {}, 'prev_data_interval_start_success': DateTime(2024, 1, 6, 1, 17, 54, 531673, tzinfo=Timezone('UTC')), 'prev_data_interval_end_success': DateTime(2024, 1, 6, 1, 17, 54, 531673, tzinfo=Timezone('UTC')), 'prev_ds': <Proxy at 0x7f842d1c3b00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'prev_ds', '2024-01-06')>, 'prev_ds_nodash': <Proxy at 0x7f842d11a800 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'prev_ds_nodash', '20240106')>, 'prev_execution_date': <Proxy at 0x7f842d1eb080 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'prev_execution_date', DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')))>, 'prev_execution_date_success': <Proxy at 0x7f842d16ee40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'prev_execution_date_success', DateTime(2024, 1, 6, 1, 17, 54, 531673, tzinfo=Timezone('UTC')))>, 'prev_start_date_success': DateTime(2024, 1, 6, 1, 17, 55, 349824, tzinfo=Timezone('UTC')), 'run_id': 'manual__2024-01-06T01:20:55.776122+00:00', 'task': <Task(PythonOperator): mssql_tasks.create_dataframe_for_sql>, 'task_instance': <TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [running]>, 'task_instance_key_str': 'example_db_providers__mssql_tasks.create_dataframe_for_sql__20240106', 'test_mode': False, 'ti': <TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [running]>, 'tomorrow_ds': <Proxy at 0x7f842cf78180 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'tomorrow_ds', '2024-01-07')>, 'tomorrow_ds_nodash': <Proxy at 0x7f842cf781c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'tomorrow_ds_nodash', '20240107')>, 'triggering_dataset_events': <Proxy at 0x7f842d1b4840 with factory <function TaskInstance.get_template_context.<locals>.get_triggering_events at 0x7f842d1a4550>>, 'ts': '2024-01-06T01:20:55.776122+00:00', 'ts_nodash': '20240106T012055', 'ts_nodash_with_tz': '20240106T012055.776122+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': <Proxy at 0x7f842cf78040 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'yesterday_ds', '2024-01-05')>, 'yesterday_ds_nodash': <Proxy at 0x7f842cf78140 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'yesterday_ds_nodash', '20240105')>, 'templates_dict': None}
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - task_id: get_gender
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - connection_id: ODS-EDWDEV-NEW
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - table_name: [STDNT].[D_GENDER]
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - ti: <TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [running]>
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - None
[2024-01-05, 17:21:06 PST] {base.py:73} INFO - Using connection ID 'ODS-EDWDEV-NEW' for task execution.
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - Warning: Unable to retrieve column names from metadata.
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - Empty DataFrame
Columns: []
Index: []
[2024-01-05, 17:21:06 PST] {python.py:183} INFO - Done. Returned value was: None
[2024-01-05, 17:21:06 PST] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=example_db_providers, task_id=mssql_tasks.create_dataframe_for_sql, execution_date=20240106T012055, start_date=20240106T012106, end_date=20240106T012106
[2024-01-05, 17:21:06 PST] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2024-01-05, 17:21:07 PST] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

Upvotes: 1

Views: 255

Answers (0)

Related Questions