Indrid
Indrid

Reputation: 1182

Airflow 2 Push Xcom with Key Name

In Airflow 2 taskflow API I can, using the following code examples, easily push and pull XCom values between tasks:-

@task(task_id="task_one")
    def get_height() -> int:
        response = requests.get("https://swapi.dev/api/people/4")
        data = json.loads(response.text)
        height = int(data["height"])
        return height

    @task(task_id="task_two")
    def check_height(val):
        # Show val:
        print(f"Value passed in is: {val}")

    check_height(get_height())

I can see that the val passed into check_height is 202 and is wrapped in the xcom default key 'return_value' and that's fine for some of the time, but I generally prefer to use specific keys.

My question is how can I push the XCom with a named key? This was really easy previously with ti.xcom_push where you could just supply the key name you wanted the value to be stuffed into, but I can't quite put my finger on how to achieve this in the taskflow api workflow.

Would appreciate any pointers or (simple, please!) examples on how to do this.

Upvotes: 3

Views: 5385

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15931

You can just set ti in the decorator as:

@task(task_id="task_one", ti)
def get_height() -> int:
    response = requests.get("https://swapi.dev/api/people/4")
    data = json.loads(response.text)
    height = int(data["height"])

    # Handle named Xcom
    ti.xcom_push("my_key", height)

For cases where you need context in deep function you can also use get_current_context. I'll use it in my example below just to show it but it's not really required in your case.

here is a working example:

import json
from datetime import datetime

import requests

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

DEFAULT_ARGS = {"owner": "airflow"}


@dag(dag_id="stackoverflow_dag", default_args=DEFAULT_ARGS, schedule_interval=None, start_date=datetime(2020, 2, 2))
def my_dag():

    @task(task_id="task_one")
    def get_height() -> int:
        response = requests.get("https://swapi.dev/api/people/4")
        data = json.loads(response.text)
        height = int(data["height"])

        # Handle named Xcom
        context = get_current_context()
        ti = context["ti"]
        ti.xcom_push("my_key", height)

        return height

    @task(task_id="task_two")
    def check_height(val):
        # Show val:
        print(f"Value passed in is: {val}")

        #Read from named Xcom
        context = get_current_context()
        ti = context["ti"]
        ti.xcom_pull("task_one")
        print(f"Value passed from xcom my_key is: {val}")

    check_height(get_height())

my_dag = my_dag()

two xcoms being pushed (one for the returned value and one with the by the key we choose): enter image description here

printing the two xcoms in downstream task_two:

enter image description here

Upvotes: 3

Related Questions