Reputation: 1552
I have a DAG where i am doing some data loading activity and at the end i am fetching the DAG run state in a Xcom and using that Xcom i want to pull its value to another dag. while doing so i am getting returned output as None.
lOAD DAG :
from datetime import timedelta, datetime
from airflow import models
from airflow import DAG
from airflow.contrib.operators import gcs_to_bq
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from airflow.models import DagRun
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from google.cloud import bigquery
import pandas as pd
import logging
DEFAULT_ARGS = {
'depends_on_past': False,
'start_date': datetime(2023, 9, 27),
'catchup': False,
'retries': 3,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'testing',
catchup=False,
default_args=DEFAULT_ARGS,
schedule_interval=None,
start_date=datetime(2023, 9, 27)
)
def date_extract(**kwargs):
today_date = datetime.now() + timedelta(hours=5, minutes=30)
Snapshot_Month_Date = today_date.replace(day=1) - timedelta(days=1)
partition_date = str(Snapshot_Month_Date.strftime("%Y-%m-%d").replace('-', ''))
load_date = str(today_date.strftime("%Y-%m-%d"))
Snapshot_Month_Date = Snapshot_Month_Date.strftime("%Y-%m-%d")
kwargs['ti'].xcom_push(key="Snapshot_Month_Date", value=str(Snapshot_Month_Date))
kwargs['ti'].xcom_push(key="partition_date", value=partition_date)
kwargs['ti'].xcom_push(key="load_date", value=load_date)
Load_DATE_FUNCTION = PythonOperator(
task_id="Load_DATE_FUNCTION",
python_callable=date_extract,
provide_context=True,
dag=dag
)
load_date="{{task_instance.xcom_pull(key='load_date',task_ids='Load_DATE_FUNCTION')}}"
Snapshot_Month_Date="{{task_instance.xcom_pull(key='Snapshot_Month_Date',task_ids='Load_DATE_FUNCTION')}}"
# Add this function at the end of each DAG (A, B, and C)
def push_dag_status_to_xcom(**kwargs):
latest_dag_run = None
for dag_run in DagRun.find(dag_id='testing', state='success'):
if latest_dag_run is None or dag_run.execution_date > latest_dag_run.execution_date:
latest_dag_run = dag_run
if latest_dag_run:
latest_dag_run_id = latest_dag_run.run_id
latest_dag_run_status = latest_dag_run.state
logging.info("Latest DAG run ID: %s, Status: %s", latest_dag_run_id, latest_dag_run_status)
if latest_dag_run_status == 'success':
kwargs['ti'].xcom_push(key='dag_status', value='success')
else:
kwargs['ti'].xcom_push(key='dag_status', value='failed')
else:
logging.info("No successful DAG runs found")
kwargs['ti'].xcom_push(key='dag_status', value='no_successful_runs')
push_dag_status_to_xcom = PythonOperator(
task_id='push_dag_status_to_xcom',
python_callable=push_dag_status_to_xcom,
provide_context=True,
)
Load_DATE_FUNCTION>>push_dag_status_to_xcom
fetch dag
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def get_dag_status(**kwargs):
# Replace 'OPEN_ORDERS_LOAD_ALL_INSTANCES_STG_LOAD_TESTING' with the actual DAG ID of your first DAG (A)
dag_status = kwargs['ti'].xcom_pull(
dag_id='OPEN_ORDERS_LOAD_ALL_INSTANCES_STG_LOAD_TESTING',
task_ids='push_dag_status_to_xcom',
key='dag_status'
)
dag = DAG(
'fetch_dag_status_example',
description='Fetch DAG status example',
schedule_interval=None,
start_date=datetime(2023, 9, 27),
catchup=False
)
fetch_dag_status_task = PythonOperator(
task_id='fetch_dag_status',
python_callable=fetch_dag_status,
provide_context=True,
dag=dag
)
thank you
Upvotes: 0
Views: 76