Sandeep Mohanty
Sandeep Mohanty

Reputation: 1552

Xcom Pull returning None

I have a DAG where i am doing some data loading activity and at the end i am fetching the DAG run state in a Xcom and using that Xcom i want to pull its value to another dag. while doing so i am getting returned output as None.

lOAD DAG :

from datetime import timedelta, datetime
from airflow import models
from airflow import DAG
from airflow.contrib.operators import gcs_to_bq
from airflow.operators.python_operator import PythonOperator 
from airflow.models import Variable
from airflow.models import DagRun
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from google.cloud import bigquery
import pandas as pd
import logging



DEFAULT_ARGS = {
    'depends_on_past': False,
    'start_date': datetime(2023, 9, 27),
    'catchup': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    'testing',
    catchup=False,
    default_args=DEFAULT_ARGS,
    schedule_interval=None,
    start_date=datetime(2023, 9, 27)
)

def date_extract(**kwargs):
    today_date = datetime.now() + timedelta(hours=5, minutes=30)
    Snapshot_Month_Date = today_date.replace(day=1) - timedelta(days=1)
    partition_date = str(Snapshot_Month_Date.strftime("%Y-%m-%d").replace('-', ''))
    load_date = str(today_date.strftime("%Y-%m-%d"))
    Snapshot_Month_Date = Snapshot_Month_Date.strftime("%Y-%m-%d")
    kwargs['ti'].xcom_push(key="Snapshot_Month_Date", value=str(Snapshot_Month_Date))
    kwargs['ti'].xcom_push(key="partition_date", value=partition_date)
    kwargs['ti'].xcom_push(key="load_date", value=load_date)

Load_DATE_FUNCTION = PythonOperator(
    task_id="Load_DATE_FUNCTION",
    python_callable=date_extract,
    provide_context=True,
    dag=dag
)
load_date="{{task_instance.xcom_pull(key='load_date',task_ids='Load_DATE_FUNCTION')}}"
Snapshot_Month_Date="{{task_instance.xcom_pull(key='Snapshot_Month_Date',task_ids='Load_DATE_FUNCTION')}}"

# Add this function at the end of each DAG (A, B, and C)
def push_dag_status_to_xcom(**kwargs):
    latest_dag_run = None
    for dag_run in DagRun.find(dag_id='testing', state='success'):
        if latest_dag_run is None or dag_run.execution_date > latest_dag_run.execution_date:
            latest_dag_run = dag_run
    
    if latest_dag_run:
        latest_dag_run_id = latest_dag_run.run_id
        latest_dag_run_status = latest_dag_run.state
        logging.info("Latest DAG run ID: %s, Status: %s", latest_dag_run_id, latest_dag_run_status)
        
        if latest_dag_run_status == 'success':
            kwargs['ti'].xcom_push(key='dag_status', value='success')
        else:
            kwargs['ti'].xcom_push(key='dag_status', value='failed')
    else:
        logging.info("No successful DAG runs found")
        kwargs['ti'].xcom_push(key='dag_status', value='no_successful_runs')

push_dag_status_to_xcom = PythonOperator(
    task_id='push_dag_status_to_xcom',
    python_callable=push_dag_status_to_xcom,
    provide_context=True,
    )
    
Load_DATE_FUNCTION>>push_dag_status_to_xcom


dag state success from load dag

fetch dag

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def get_dag_status(**kwargs):
  # Replace 'OPEN_ORDERS_LOAD_ALL_INSTANCES_STG_LOAD_TESTING' with the actual DAG ID of your first DAG (A)
  dag_status = kwargs['ti'].xcom_pull(
      dag_id='OPEN_ORDERS_LOAD_ALL_INSTANCES_STG_LOAD_TESTING',
      task_ids='push_dag_status_to_xcom',
      key='dag_status'
  )

dag = DAG(
    'fetch_dag_status_example',
    description='Fetch DAG status example',
    schedule_interval=None,
    start_date=datetime(2023, 9, 27),
    catchup=False
)

fetch_dag_status_task = PythonOperator(
    task_id='fetch_dag_status',
    python_callable=fetch_dag_status,
    provide_context=True,
    dag=dag
)

dag returning none

thank you

Upvotes: 0

Views: 76

Answers (0)

Related Questions