daniel guo
daniel guo

Reputation: 175

Airflow XCom not retrieving values between tasks in DAG

I'm experiencing an issue with Apache Airflow where values pushed to XCom in one task are not retrievable in a subsequent task within the same DAG. Here is a minimal example of my code:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from pathlib import Path
from airflow.utils.log.logging_mixin import LoggingMixin

task_logger = LoggingMixin().log

def create_folders_with_subfolders(base_path, **kwargs):
    execution_date = kwargs['execution_date']
    dir_name = execution_date.strftime("%Y%m%d")
    main_folder = Path(base_path) / dir_name
    main_folder.mkdir(parents=True, exist_ok=True)

    subfolders = ['sftp_raw_data', 'validate_delimiter', 'lowercase', 'data_quality', 'process_files']
    paths = {}
    for subfolder in subfolders:
        subfolder_path = main_folder / subfolder
        subfolder_path.mkdir(exist_ok=True)
        paths[subfolder] = str(subfolder_path)
        kwargs['ti'].xcom_push(key=subfolder, value=str(subfolder_path))
        task_logger.info(f"XCom push for {subfolder}: {str(subfolder_path)}")
    task_logger.info(f"paths {paths}")

def validate_delimiter_and_load_csv(ti, **kwargs):
    task_logger.info(f"ti {ti}")
    zip_path = ti.xcom_pull(task_ids='create_dirs_task', key='sftp_raw_data')
    output_dir = ti.xcom_pull(task_ids='create_dirs_task', key='validate_delimiter')

    task_logger.info(f"zip_path pulled from XCom: {zip_path}")
    task_logger.info(f"output_dir pulled from XCom: {output_dir}")

with DAG(
    dag_id="example_dag",
    start_date=days_ago(1),
    catchup=False,
    schedule_interval="0 6,18 * * *",
    tags=["example"],
) as dag:

    create_dirs_task = PythonOperator(
        task_id="create_dirs_task",
        python_callable=create_folders_with_subfolders,
        op_kwargs={'base_path': '/path/to/directory'},
        provide_context=True,
    )

    validate_task = PythonOperator(
        task_id="validate_delimiter_and_load_csv",
        python_callable=validate_delimiter_and_load_csv,
        provide_context=True,
    )

    create_dirs_task >> validate_task

Description: In the DAG defined above, I have two tasks: create_dirs_task which creates directories and pushes paths to XCom, and validate_delimiter_and_load_csv which is supposed to pull these paths from XCom. However, the paths are not being pulled correctly in the validate_delimiter_and_load_csv task — the logs show None for both zip_path and output_dir.

The XCom push seems to work fine as verified by the logs and the Airflow UI showing the XCom entries. Yet, when attempting to pull these entries in the subsequent task, they return None. Both tasks use provide_context=True.

I would appreciate any suggestions on what might be causing this issue and how to resolve it. Thank you!

Upvotes: 0

Views: 213

Answers (1)

Illia Kaltovich
Illia Kaltovich

Reputation: 130

I truly want to encourage you to use TaskFlowAPI instead of traditional PythonOperators. It simplifies writing Python DAGs significantly, reducing duplication and providing easy ways to debug locally. See reference - https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

Here is a dummy reproducible example with partially commented code from your original:

# file can create more dags from templates
from airflow import DAG

# from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from pathlib import Path

# from airflow.utils.log.logging_mixin import LoggingMixin

# task_logger = LoggingMixin().log
from airflow.decorators import task


with DAG(
    dag_id="example_dag",
    start_date=days_ago(1),
    catchup=False,
    schedule_interval="0 6,18 * * *",
    tags=["example"],
) as dag:

    @task
    def create_folders_with_subfolders(base_path, **kwargs) -> dict[str, str]:
        execution_date = kwargs["execution_date"]
        dir_name = execution_date.strftime("%Y%m%d")
        main_folder = Path(base_path) / dir_name
        # main_folder.mkdir(parents=True, exist_ok=True)

        subfolders = ["sftp_raw_data", "validate_delimiter", "lowercase", "data_quality", "process_files"]
        paths = {}
        for subfolder in subfolders:
            subfolder_path = main_folder / subfolder
            # subfolder_path.mkdir(exist_ok=True)
            paths[subfolder] = str(subfolder_path)
            kwargs["ti"].xcom_push(key=subfolder, value=str(subfolder_path))
            # task_logger.info(f"XCom push for {subfolder}: {str(subfolder_path)}")
        # task_logger.info(f"paths {paths}")
        return paths

    @task
    def validate_delimiter_and_load_csv(paths, **kwargs):
        # task_logger.info(f"ti {ti}")
        print(paths.items())
        # task_logger.info(f"zip_path pulled from XCom: {zip_path}")
        # task_logger.info(f"output_dir pulled from XCom: {output_dir}")

    validate_delimiter_and_load_csv(create_folders_with_subfolders(base_path="/path/to/directory"))

Pay attention: to get the output as a dictionary from a task, the multiple_outputs=True parameter of the @task decorator should be explicitly set, or the corresponding type annotation should be provided

Upvotes: 0

Related Questions