Reputation: 81
I have a function in a DAG
file to convert MSSQL
and Oracle
Operator queried data into a pandas
dataframe and ti.xcom_pull(task_ids=task_id)
pulls None
.
The XCom
itself when checked in Airflow
webserver(GUI) has the correct key:value
data as expected.
Please help.
Here's what the script looks like:
DAG_ID = "example_db_providers"
# Define the function to create DataFrame
def create_dataframe(task_id, database_type, connection_id, table_name=None, **context):
print("context:", context)
print("task_id:", task_id)
print("database:", database_type)
print("table_name:", table_name)
ti = context["ti"]
print("ti:", ti)
xcom_data = ti.xcom_pull(task_ids=task_id) # Retrieve XCOMs from query_task
print(xcom_data)
if database_type == "mssql":
hook = MsSqlHook(mssql_conn_id=connection_id)
column_names = hook.get_conn().cursor().description
if column_names:
column_names = [col[0] for col in column_names] # Extract column names
df = DataFrame(data=xcom_data, columns=column_names)
else:
# If metadata retrieval fails, handle gracefully
print("Warning: Unable to retrieve column names from metadata.")
df = DataFrame(data=xcom_data) # Create DataFrame without column names
# Optionally, set column names manually if known
elif database_type == "oracle":
# Attempt to retrieve column names from metadata using OracleHook
hook = OracleHook(oracle_conn_id=connection_id)
try:
column_names = hook.get_records(
sql=f"SELECT * FROM {table_name} WHERE ROWNUM <= 0"
)[
0
].keys() # Fetch column names
df = DataFrame(data=xcom_data, columns=column_names)
except Exception as e:
# If metadata retrieval fails, handle gracefully
print("Warning: Unable to retrieve column names from metadata.")
print(f"Error: {e}")
df = DataFrame(data=xcom_data) # Create DataFrame without column names
# Optionally, set column names manually if known
# Perform any additional operations on the DataFrame
# ...
print(df.head()) # Example: Print the first 5 rows
with DAG(
dag_id=DAG_ID,
schedule="@once",
start_date=datetime(year=2021, month=10, day=1),
tags=["example"],
catchup=False,
) as dag:
# MSSQL task group
with TaskGroup(group_id="mssql_tasks") as mssql_tasks:
get_gender = MsSqlOperator(
task_id="get_gender",
mssql_conn_id="ODS-EDWDEV-NEW",
sql=r"""SELECT * FROM [edw_landing].[STDNT].[D_GENDER]""",
dag=dag,
)
def print_table(**context):
ti = context["ti"]
xcom_data = ti.xcom_pull(
task_id=get_gender
) # Retrieve XCOMs from query_task
print(xcom_data)
test_print = PythonOperator(
task_id="test_print",
python_callable=print_table,
provide_context=True,
dag=dag,
)
sql_dataframe_task = PythonOperator(
task_id="create_dataframe_for_sql",
python_callable=create_dataframe,
op_kwargs={
"task_id": "get_gender",
"database_type": "mssql",
"connection_id": "ODS-EDWDEV-NEW",
"table_name": "[STDNT].[D_GENDER]",
}, # Pass required arguments
provide_context=True,
dag=dag,
)
# get_gender >> test_print >> sql_dataframe_task
# get_gender >> sql_dataframe_task
# get_gender >> test_print
mssql_tasks
I tried various combinations of task
and taskgroup
dependencies to check if it would make a difference. I also attempted to access the same data within the print_table
function within the taskgroup
to see if the xcom
was not accessible outside of the DAG
, but got the same result.
Here's what my Log looks like:
*** * /home/airflow/logs/dag_id=example_db_providers/run_id=manual__2024-01-06T01:20:55.776122+00:00/task_id=mssql_tasks.create_dataframe_for_sql/attempt=1.log
[2024-01-05, 17:21:06 PST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [queued]>
[2024-01-05, 17:21:06 PST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [queued]>
[2024-01-05, 17:21:06 PST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2024-01-05, 17:21:06 PST] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): mssql_tasks.create_dataframe_for_sql> on 2024-01-06 01:20:55.776122+00:00
[2024-01-05, 17:21:06 PST] {standard_task_runner.py:57} INFO - Started process 129238 to run task
[2024-01-05, 17:21:06 PST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'example_db_providers', 'mssql_tasks.create_dataframe_for_sql', 'manual__2024-01-06T01:20:55.776122+00:00', '--job-id', '386', '--raw', '--subdir', 'DAGS_FOLDER/test_dags/test_provider.py', '--cfg-path', '/tmp/tmpuyc1ib5t']
[2024-01-05, 17:21:06 PST] {standard_task_runner.py:85} INFO - Job 386: Subtask mssql_tasks.create_dataframe_for_sql
[2024-01-05, 17:21:06 PST] {task_command.py:410} INFO - Running <TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [running]> on host ods-etldev-app
[2024-01-05, 17:21:06 PST] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_db_providers' AIRFLOW_CTX_TASK_ID='mssql_tasks.create_dataframe_for_sql' AIRFLOW_CTX_EXECUTION_DATE='2024-01-06T01:20:55.776122+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-01-06T01:20:55.776122+00:00'
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - context: {'conf': <***.configuration.AirflowConfigParser object at 0x7f8446e85900>, 'dag': <DAG: example_db_providers>, 'dag_run': <DagRun example_db_providers @ 2024-01-06 01:20:55.776122+00:00: manual__2024-01-06T01:20:55.776122+00:00, state:running, queued_at: 2024-01-06 01:20:55.782080+00:00. externally triggered: True>, 'data_interval_end': DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')), 'ds': '2024-01-06', 'ds_nodash': '20240106', 'execution_date': <Proxy at 0x7f842cf440c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'execution_date', DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')))>, 'expanded_ti_count': None, 'inlets': [], 'logical_date': DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')), 'macros': <module '***.macros' from '/home/***/env/lib/python3.10/site-packages/***/macros/__init__.py'>, 'next_ds': <Proxy at 0x7f842d227e40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'next_ds', '2024-01-06')>, 'next_ds_nodash': <Proxy at 0x7f842cf734c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'next_ds_nodash', '20240106')>, 'next_execution_date': <Proxy at 0x7f842cf72500 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'next_execution_date', DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')))>, 'outlets': [], 'params': {}, 'prev_data_interval_start_success': DateTime(2024, 1, 6, 1, 17, 54, 531673, tzinfo=Timezone('UTC')), 'prev_data_interval_end_success': DateTime(2024, 1, 6, 1, 17, 54, 531673, tzinfo=Timezone('UTC')), 'prev_ds': <Proxy at 0x7f842d1c3b00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'prev_ds', '2024-01-06')>, 'prev_ds_nodash': <Proxy at 0x7f842d11a800 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'prev_ds_nodash', '20240106')>, 'prev_execution_date': <Proxy at 0x7f842d1eb080 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'prev_execution_date', DateTime(2024, 1, 6, 1, 20, 55, 776122, tzinfo=Timezone('UTC')))>, 'prev_execution_date_success': <Proxy at 0x7f842d16ee40 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'prev_execution_date_success', DateTime(2024, 1, 6, 1, 17, 54, 531673, tzinfo=Timezone('UTC')))>, 'prev_start_date_success': DateTime(2024, 1, 6, 1, 17, 55, 349824, tzinfo=Timezone('UTC')), 'run_id': 'manual__2024-01-06T01:20:55.776122+00:00', 'task': <Task(PythonOperator): mssql_tasks.create_dataframe_for_sql>, 'task_instance': <TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [running]>, 'task_instance_key_str': 'example_db_providers__mssql_tasks.create_dataframe_for_sql__20240106', 'test_mode': False, 'ti': <TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [running]>, 'tomorrow_ds': <Proxy at 0x7f842cf78180 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'tomorrow_ds', '2024-01-07')>, 'tomorrow_ds_nodash': <Proxy at 0x7f842cf781c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'tomorrow_ds_nodash', '20240107')>, 'triggering_dataset_events': <Proxy at 0x7f842d1b4840 with factory <function TaskInstance.get_template_context.<locals>.get_triggering_events at 0x7f842d1a4550>>, 'ts': '2024-01-06T01:20:55.776122+00:00', 'ts_nodash': '20240106T012055', 'ts_nodash_with_tz': '20240106T012055.776122+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': <Proxy at 0x7f842cf78040 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'yesterday_ds', '2024-01-05')>, 'yesterday_ds_nodash': <Proxy at 0x7f842cf78140 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f842d2be440>, 'yesterday_ds_nodash', '20240105')>, 'templates_dict': None}
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - task_id: get_gender
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - connection_id: ODS-EDWDEV-NEW
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - table_name: [STDNT].[D_GENDER]
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - ti: <TaskInstance: example_db_providers.mssql_tasks.create_dataframe_for_sql manual__2024-01-06T01:20:55.776122+00:00 [running]>
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - None
[2024-01-05, 17:21:06 PST] {base.py:73} INFO - Using connection ID 'ODS-EDWDEV-NEW' for task execution.
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - Warning: Unable to retrieve column names from metadata.
[2024-01-05, 17:21:06 PST] {logging_mixin.py:149} INFO - Empty DataFrame
Columns: []
Index: []
[2024-01-05, 17:21:06 PST] {python.py:183} INFO - Done. Returned value was: None
[2024-01-05, 17:21:06 PST] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=example_db_providers, task_id=mssql_tasks.create_dataframe_for_sql, execution_date=20240106T012055, start_date=20240106T012106, end_date=20240106T012106
[2024-01-05, 17:21:06 PST] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2024-01-05, 17:21:07 PST] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check
Upvotes: 1
Views: 255