Juho
Juho

Reputation: 1006

Accessing response from SimpleHTTPOperator in another task

Relating to this earlier question, suppose that we have an Apache Airflow DAG that comprises two tasks, first an HTTP request (i.e., SimpleHTTPOperator) and then a PythonOperator that does something with the response of the first task.

Conveniently, using the Dog CEO API as an example, consider the following DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
}
with DAG(
    'dog_api',
    default_args=default_args,
    description='Get nice dog pics',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['dog'],
) as dag:
    get_dog = SimpleHttpOperator(
        task_id='get_dog',
        http_conn_id='dog_api', # NOTE: set up an HTTP connection called 'dog_api' with host 'https://dog.ceo/api'
        endpoint='/breeds/image/random',
        method="GET",
        # xcom_push=True # NOTE: no such argument in 2.2.0 but sometimes suggested by older guides online
    )
    
    def xcom_check(ds, **kwargs):
        val = kwargs['ti'].xcom_pull(key='return_value', task_ids='get_dog')
        return f"xcom_check has: {kwargs['ti']} and it says: {val}"
     
    inspect_dog = PythonOperator(
        task_id='inspect_dog',
        python_callable=xcom_check,
        provide_context=True
    )

We'd like to access the return value of get_dog inside xcom_check. By inspecting the logs, get_dog populates the xcom storage nicely to something like:

xcom value

But now, this is not currently passed to the second task. This can be seen by inspecting the logs as well, which says (among other things):

*redacted* Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-30T16:27:23.081539+00:00 [running]> and it says: None

So obviously, the task instance is "dog_api.inspect_dog" but we'd want it to be "dog_api.get_dog". How is this done? At the time of writing, the same question is asked in the comments of the previous question, upvoted, but unanswered. I also tried adapting this answer but can't figure out what I'm still doing differently.

Upvotes: 3

Views: 2032

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15979

Your problem is that you did not set dependency between the tasks so inspect_dog may run before or in parallel to get_dog when this happens get_dog will see no xcom value because inspect_dog didn't push it yet.

enter image description here

You just need to set dependency as:

get_dog >> inspect_dog

enter image description here

Log :

[2021-10-31, 07:07:21 UTC] {python.py:174} INFO - Done. Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-31T07:05:27.721051+00:00 [running]> and it says: {"message":"https:\/\/images.dog.ceo\/breeds\/pointer-germanlonghair\/hans1.jpg","status":"success"}

As for your comment in the code about xcom_push: The xcom_push parameter was used in older Airflow versions. It was replaced by do_xcom_push (see source code). Notice that the default value of this parameter is True.

Upvotes: 4

Related Questions