AGaur
AGaur

Reputation: 305

use xcom pull to retrieve the variables pushed from other dag

I am very much new to airflow and using "xcom_push" and "xcom_pull" function.

I have two dags d1 which has task t1 and second dag d2 with task t2 .

Now i am pushing the values from dag d1 using:

kwargs['ti'].xcom_push(key='start_date',value=start_date)
kwargs['ti'].xcom_push(key='end_date',value=end_date)

and pulling the same start_date and end_date in dag d2 using :

start_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t1',key="start_date")
end_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t2' , key="end_Date")

However getting "NONETYPE"error during xcom_pull.Can anyone please help me how can i pull the values from dag d1 into the dag d2

Upvotes: 9

Views: 19725

Answers (3)

Oleksandr Baranov
Oleksandr Baranov

Reputation: 638

You need to pass additionally param include_prior_dates=True, so that it would check XCom from previous dates.

:param include_prior_dates: If False, only XCom from the current execution_date are returned. If True, XCom from previous dates are returned as well.

in your case looks like that:

start_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t1',key="start_date", include_prior_dates=True)
end_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t2' , key="end_Date", include_prior_dates=True)

Upvotes: 13

SunnyAk
SunnyAk

Reputation: 588

XCOM's are used to exchange information between tasks of a single Dag. The use case where you would like to retrieve output of a Dag for another Dag can be accomplished by storing the results in a file/database and proceeding accordingly.

Upvotes: -2

Zack
Zack

Reputation: 2466

Are you having anything actually set the xcom variables?

Try the dags below:

d1

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

args = {
    'owner': 'znovak',
    'email': ['[email protected]'],
    'depends_on_past': False,
    'email_on_retry': False,
    'start_date': datetime(2019, 11, 4)
}

dag = DAG(
    dag_id='d1',
    default_args=args,
    catchup=False,
    schedule_interval=None
    )

###############################
##### Create DAG Parameters ###
###############################
def set_xcom_params(**kwargs):
    kwargs['ti'].xcom_push(key='start_date',value=start_date)
    kwargs['ti'].xcom_push(key='end_date',value=end_date)

t1 = PythonOperator(
    task_id='t1',
    python_callable=set_xcom_params,
    dag=dag,
    provide_context=True
    )

d2

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

args = {
    'owner': 'znovak',
    'email': ['[email protected]'],
    'depends_on_past': False,
    'email_on_retry': False,
    'start_date': datetime(2019, 11, 4)
}

dag = DAG(
    dag_id='d2',
    default_args=args,
    catchup=False,
    schedule_interval=None
    )

def pull_xcom_params(**kwargs):
    start_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="start_date")
    end_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="end_date")
    print(start_date)
    print(end_date)

t2 = PythonOperator(
    task_id='t2',
    python_callable=pull_xcom_params,
    dag=dag,
    provide_context=True
    )

Upvotes: 0

Related Questions