Reputation: 175
I'm experiencing an issue with Apache Airflow where values pushed to XCom in one task are not retrievable in a subsequent task within the same DAG. Here is a minimal example of my code:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from pathlib import Path
from airflow.utils.log.logging_mixin import LoggingMixin
task_logger = LoggingMixin().log
def create_folders_with_subfolders(base_path, **kwargs):
execution_date = kwargs['execution_date']
dir_name = execution_date.strftime("%Y%m%d")
main_folder = Path(base_path) / dir_name
main_folder.mkdir(parents=True, exist_ok=True)
subfolders = ['sftp_raw_data', 'validate_delimiter', 'lowercase', 'data_quality', 'process_files']
paths = {}
for subfolder in subfolders:
subfolder_path = main_folder / subfolder
subfolder_path.mkdir(exist_ok=True)
paths[subfolder] = str(subfolder_path)
kwargs['ti'].xcom_push(key=subfolder, value=str(subfolder_path))
task_logger.info(f"XCom push for {subfolder}: {str(subfolder_path)}")
task_logger.info(f"paths {paths}")
def validate_delimiter_and_load_csv(ti, **kwargs):
task_logger.info(f"ti {ti}")
zip_path = ti.xcom_pull(task_ids='create_dirs_task', key='sftp_raw_data')
output_dir = ti.xcom_pull(task_ids='create_dirs_task', key='validate_delimiter')
task_logger.info(f"zip_path pulled from XCom: {zip_path}")
task_logger.info(f"output_dir pulled from XCom: {output_dir}")
with DAG(
dag_id="example_dag",
start_date=days_ago(1),
catchup=False,
schedule_interval="0 6,18 * * *",
tags=["example"],
) as dag:
create_dirs_task = PythonOperator(
task_id="create_dirs_task",
python_callable=create_folders_with_subfolders,
op_kwargs={'base_path': '/path/to/directory'},
provide_context=True,
)
validate_task = PythonOperator(
task_id="validate_delimiter_and_load_csv",
python_callable=validate_delimiter_and_load_csv,
provide_context=True,
)
create_dirs_task >> validate_task
Description: In the DAG defined above, I have two tasks: create_dirs_task which creates directories and pushes paths to XCom, and validate_delimiter_and_load_csv which is supposed to pull these paths from XCom. However, the paths are not being pulled correctly in the validate_delimiter_and_load_csv task — the logs show None for both zip_path and output_dir.
The XCom push seems to work fine as verified by the logs and the Airflow UI showing the XCom entries. Yet, when attempting to pull these entries in the subsequent task, they return None. Both tasks use provide_context=True.
I would appreciate any suggestions on what might be causing this issue and how to resolve it. Thank you!
Upvotes: 0
Views: 213
Reputation: 130
I truly want to encourage you to use TaskFlowAPI instead of traditional PythonOperators. It simplifies writing Python DAGs significantly, reducing duplication and providing easy ways to debug locally. See reference - https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
Here is a dummy reproducible example with partially commented code from your original:
# file can create more dags from templates
from airflow import DAG
# from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from pathlib import Path
# from airflow.utils.log.logging_mixin import LoggingMixin
# task_logger = LoggingMixin().log
from airflow.decorators import task
with DAG(
dag_id="example_dag",
start_date=days_ago(1),
catchup=False,
schedule_interval="0 6,18 * * *",
tags=["example"],
) as dag:
@task
def create_folders_with_subfolders(base_path, **kwargs) -> dict[str, str]:
execution_date = kwargs["execution_date"]
dir_name = execution_date.strftime("%Y%m%d")
main_folder = Path(base_path) / dir_name
# main_folder.mkdir(parents=True, exist_ok=True)
subfolders = ["sftp_raw_data", "validate_delimiter", "lowercase", "data_quality", "process_files"]
paths = {}
for subfolder in subfolders:
subfolder_path = main_folder / subfolder
# subfolder_path.mkdir(exist_ok=True)
paths[subfolder] = str(subfolder_path)
kwargs["ti"].xcom_push(key=subfolder, value=str(subfolder_path))
# task_logger.info(f"XCom push for {subfolder}: {str(subfolder_path)}")
# task_logger.info(f"paths {paths}")
return paths
@task
def validate_delimiter_and_load_csv(paths, **kwargs):
# task_logger.info(f"ti {ti}")
print(paths.items())
# task_logger.info(f"zip_path pulled from XCom: {zip_path}")
# task_logger.info(f"output_dir pulled from XCom: {output_dir}")
validate_delimiter_and_load_csv(create_folders_with_subfolders(base_path="/path/to/directory"))
Pay attention: to get the output as a dictionary from a task, the multiple_outputs=True
parameter of the @task
decorator should be explicitly set, or the corresponding type annotation should be provided
Upvotes: 0