Reputation: 41
I'm new to Apache Airflow and trying to write my first Dag which has a task based on another task (using ti.xcom_pull)
PS : I run Airflow in WSL Ubuntu 20.04 using VScode.
I created a task 1 (task_id = "get_datetime") that runs the "date" bash command (and it works)
then I created another task (task_id='process_datetime') which takes the datetime of the first task and processes it, and I set the python_callable and everything is fine..
the issue is that dt = ti.xcom_pull gives a NoneType when I run "airflow tasks test first_ariflow_dag process_datetime 2022-11-1" in the terminal, but when I see the log in the Airflow UI, I find that it works normally. could someone give me a solution please?
`
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def process_datetime(ti):
dt = ti.xcom_pull(task_ids=['get_datetime'])
if not dt :
raise Exception('No datetime value')
dt = str(dt[0]).split()
return{
'year':int(dt[-1]),
'month':dt[1],
'day':int(dt[2]),
'time':dt[3],
'day_of_week':dt[0]
}
with DAG(
dag_id='first_ariflow_dag',
schedule_interval='* * * * *',
start_date=datetime(year=2022, month=11, day=1),
catchup=False
) as dag:
# 1. Get the current datetime
task_get_datetime= BashOperator(
task_id = 'get_datetime',
bash_command='date'
)
# 2. Process the datetime
task_process_datetime= PythonOperator(
task_id = 'process_datetime',
python_callable=process_datetime
)
`
I get this error :
[2022-11-02 00:51:45,420] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/mnt/c/Users/Salim/Desktop/A-Learning/Airflow_Conda/airflow_env/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/mnt/c/Users/Salim/Desktop/A-Learning/Airflow_Conda/airflow_env/lib/python3.8/site-packages/airflow/operators/python.py", line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/salim/airflow/dags/first_dag.py", line 12, in process_datetime
raise Exception('No datetime value')
Exception: No datetime value
Upvotes: 0
Views: 1178
Reputation: 1
I had the exact same issue when I was using Airflow 2.6.3 and the xcom on the Aiflow UI didnot show the xcom_push from bashoperator to pythonoperator but upgrading it to 2.7.1 fixed this and I was able to see the xcom on the aiflow UI as well with 2.7.1
Upvotes: 0
Reputation: 121
According to the documentation, to upload data to xcom you need to set the variable do_xcom_push (Airflow 2) or xcom_push (Airflow 1).
If BaseOperator.do_xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes
BashOperator should look like this:
task_get_datetime= BashOperator(
task_id = 'get_datetime',
bash_command='date',
do_xcom_push=True
)
Upvotes: 1