Reputation: 305
I am very much new to airflow and using "xcom_push" and "xcom_pull" function.
I have two dags d1 which has task t1 and second dag d2 with task t2 .
Now i am pushing the values from dag d1 using:
kwargs['ti'].xcom_push(key='start_date',value=start_date)
kwargs['ti'].xcom_push(key='end_date',value=end_date)
and pulling the same start_date and end_date in dag d2 using :
start_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t1',key="start_date")
end_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t2' , key="end_Date")
However getting "NONETYPE"error during xcom_pull.Can anyone please help me how can i pull the values from dag d1 into the dag d2
Upvotes: 9
Views: 19725
Reputation: 638
You need to pass additionally param include_prior_dates=True
, so that it would check XCom from previous dates.
:param include_prior_dates: If False, only XCom from the current execution_date are returned. If True, XCom from previous dates are returned as well.
in your case looks like that:
start_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t1',key="start_date", include_prior_dates=True)
end_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t2' , key="end_Date", include_prior_dates=True)
Upvotes: 13
Reputation: 588
XCOM's are used to exchange information between tasks of a single Dag. The use case where you would like to retrieve output of a Dag for another Dag can be accomplished by storing the results in a file/database and proceeding accordingly.
Upvotes: -2
Reputation: 2466
Are you having anything actually set the xcom variables?
Try the dags below:
d1
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta
args = {
'owner': 'znovak',
'email': ['[email protected]'],
'depends_on_past': False,
'email_on_retry': False,
'start_date': datetime(2019, 11, 4)
}
dag = DAG(
dag_id='d1',
default_args=args,
catchup=False,
schedule_interval=None
)
###############################
##### Create DAG Parameters ###
###############################
def set_xcom_params(**kwargs):
kwargs['ti'].xcom_push(key='start_date',value=start_date)
kwargs['ti'].xcom_push(key='end_date',value=end_date)
t1 = PythonOperator(
task_id='t1',
python_callable=set_xcom_params,
dag=dag,
provide_context=True
)
d2
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta
args = {
'owner': 'znovak',
'email': ['[email protected]'],
'depends_on_past': False,
'email_on_retry': False,
'start_date': datetime(2019, 11, 4)
}
dag = DAG(
dag_id='d2',
default_args=args,
catchup=False,
schedule_interval=None
)
def pull_xcom_params(**kwargs):
start_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="start_date")
end_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="end_date")
print(start_date)
print(end_date)
t2 = PythonOperator(
task_id='t2',
python_callable=pull_xcom_params,
dag=dag,
provide_context=True
)
Upvotes: 0