Salim Mzoughi
Salim Mzoughi

Reputation: 41

Airflow XCOMs communication from BashOperator to PythonOperator

I'm new to Apache Airflow and trying to write my first Dag which has a task based on another task (using ti.xcom_pull)

PS : I run Airflow in WSL Ubuntu 20.04 using VScode.

I created a task 1 (task_id = "get_datetime") that runs the "date" bash command (and it works)

then I created another task (task_id='process_datetime') which takes the datetime of the first task and processes it, and I set the python_callable and everything is fine..

the issue is that dt = ti.xcom_pull gives a NoneType when I run "airflow tasks test first_ariflow_dag process_datetime 2022-11-1" in the terminal, but when I see the log in the Airflow UI, I find that it works normally. could someone give me a solution please?

`

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator


def process_datetime(ti):
    dt = ti.xcom_pull(task_ids=['get_datetime'])
    if not dt :
        raise Exception('No datetime value')
    
    dt = str(dt[0]).split()

    return{
        'year':int(dt[-1]),
        'month':dt[1],
        'day':int(dt[2]),
        'time':dt[3],
        'day_of_week':dt[0]
    }

with DAG(
    dag_id='first_ariflow_dag',
    schedule_interval='* * * * *',
    start_date=datetime(year=2022, month=11, day=1),
    catchup=False

) as dag:
    # 1. Get the current datetime
    task_get_datetime= BashOperator(
        task_id = 'get_datetime',
        bash_command='date'
        )

    # 2. Process the datetime
    task_process_datetime= PythonOperator(
        task_id = 'process_datetime',
        python_callable=process_datetime
    )

`

I get this error :

[2022-11-02 00:51:45,420] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/mnt/c/Users/Salim/Desktop/A-Learning/Airflow_Conda/airflow_env/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/mnt/c/Users/Salim/Desktop/A-Learning/Airflow_Conda/airflow_env/lib/python3.8/site-packages/airflow/operators/python.py", line 193, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/salim/airflow/dags/first_dag.py", line 12, in process_datetime
    raise Exception('No datetime value')
Exception: No datetime value

Upvotes: 0

Views: 1178

Answers (2)

sharat
sharat

Reputation: 1

I had the exact same issue when I was using Airflow 2.6.3 and the xcom on the Aiflow UI didnot show the xcom_push from bashoperator to pythonoperator but upgrading it to 2.7.1 fixed this and I was able to see the xcom on the aiflow UI as well with 2.7.1

Upvotes: 0

GuziQ
GuziQ

Reputation: 121

According to the documentation, to upload data to xcom you need to set the variable do_xcom_push (Airflow 2) or xcom_push (Airflow 1).

If BaseOperator.do_xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes

BashOperator should look like this:

task_get_datetime= BashOperator(
    task_id = 'get_datetime',
    bash_command='date',
    do_xcom_push=True
)

Upvotes: 1

Related Questions