sdot257
sdot257

Reputation: 10366

Airflow - How to pass xcom variable into Python function

I need to reference a variable that's returned by a BashOperator. In my task_archive_s3_file, I need to get the filename from get_s3_file. The task simply prints {{ ti.xcom_pull(task_ids=submit_file_to_spark) }} as a string instead of the value.

If I use the bash_command, the value prints correctly.

get_s3_file = PythonOperator(
    task_id='get_s3_file',
    python_callable=obj.func_get_s3_file,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag)

submit_file_to_spark = BashOperator(
    task_id='submit_file_to_spark',
    bash_command="echo 'hello world'",
    trigger_rule="all_done",
    xcom_push=True,
    dag=dag)

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
#    bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
    python_callable=obj.func_archive_s3_file,
    params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
    dag=dag)

get_s3_file >> submit_file_to_spark >> task_archive_s3_file

Upvotes: 69

Views: 212824

Answers (7)

Daniel Huang
Daniel Huang

Reputation: 6538

Templates like {{ ti.xcom_pull(...) }} can only be used inside of parameters that support templates or they won't be rendered prior to execution. See the template_fields, template_fields_renderers and template_ext attributes of the PythonOperator and BashOperator.

So op_kwargs/op_args can be used to pass templates to your Python operator:

def func_archive_s3_file(s3_path_filename):
    archive(s3_path_filename)

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    op_kwargs={ 's3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })

You can also pass arguments using op_args as a list of positional arguments.

However in the case of fetching an XCom value, another alternative is just using the TaskInstance object made available to you via context:

def func_archive_s3_file(**context):
    archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,

Upvotes: 80

mitesh keswani
mitesh keswani

Reputation: 153

It took me time to figure out. I was returning {"notice": "end"} And the below {{ ti.xcom_pull(task_ids=<task_id>) }} was failing And finally I realized, it should not be just json but json String. Probably using json.dumps(json_value) So, my task_id function should return '{"notice": "end"}' for it to work. Indeed, it worked for me.

Upvotes: 0

p13rr0m
p13rr0m

Reputation: 1297

The Airflow BaseOperator defines a property output that you can use to access the xcom content of the given operator. Here is a concrete example

with DAG(...):
    push_task = PythonOperator(
        task_id='push_task', 
        python_callable=lambda: 'Hello, World!')

    pull_task = PythonOperator(
        task_id='pull_task', 
        python_callable=lambda x: print(x),
        op_args=[push_task.output])

which should be almost equivalent to

with DAG(...):
    push_task = PythonOperator(
        task_id='push_task', 
        python_callable=lambda: 'Hello, World!')

    pull_task = PythonOperator(
        task_id='pull_task', 
        python_callable=lambda x: print(x),
        op_args=["{{ task_instance.xcom_pull('push_task') }}"])

As far as I know, the only difference is that the former implicitly defines push_task >> pull_task.

Upvotes: 1

Galuoises
Galuoises

Reputation: 3283

If you want to pass an xcom to a bash operator in airflow 2 use env; let's say you have pushed to a xcom my_xcom_var, then you can use jinja inside env to pull the xcom value, e.g.

BashOperator(
    task_id=mytask,
    bash_command="echo ${MYVAR}",
    env={"MYVAR": '{{ ti.xcom_pull(key=\'my_xcom_var\') }}'},
    dag=dag
)

Check https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/bash/index.html#module-airflow.operators.bash for more details

Upvotes: 3

Noumenon
Noumenon

Reputation: 6412

In Airflow 2.0 (released December 2020), the TaskFlow API has made passing XComs easier. With this API, you can simply return values from functions annotated with @task, and they will be passed as XComs behind the scenes. Example from the tutorial:

    @task()
    def extract():
        ...
        return order_data_dict
    
    @task()
    def transform(order_data_dict: dict):
        ...
        return total_order_value

    order_data = extract()
    order_summary = transform(order_data)

In this example, order_data has type XComArg. It stores the dictionary returned by the extract task. When the transform task runs, order_data is unwrapped, and the task receives the plain Python object that was stored.

Upvotes: 7

aaron
aaron

Reputation: 6489

Upvoted both the question and the answer, but I think that this can be made a little more clear for those users who just want to pass small data objects between PythonOperator tasks in their DAGs. Referencing this question and this XCom example got me to the following solution. Super simple:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

DAG = DAG(
  dag_id='example_dag',
  start_date=datetime.now(),
  schedule_interval='@once'
)

def push_function(**kwargs):
    ls = ['a', 'b', 'c']
    return ls

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids='push_task')
    print(ls)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

I'm not sure why this works, but it does. A few questions for the community:

  • What's happening with ti here? How is that built in to **kwargs?
  • Is provide_context=True necessary for both functions?

Any edits to make this answer clearer are very welcome!

Upvotes: 83

dan
dan

Reputation: 388

Used the same code and modified params like Startdate etc.

import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

DAG = DAG(
  dag_id='simple_xcom',
  default_args=args,
#  start_date=datetime(2019, 04, 21),
  schedule_interval="@daily",
  #schedule_interval=timedelta(1)
)

def push_function(**context):
    msg='the_message'
    print("message to push: '%s'" % msg)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="the_message", value=msg)

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    msg = ti.xcom_pull(task_ids='push_task',key='the_message')
    print("received message: '%s'" % msg)

pull_task = PythonOperator(`enter code here`
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

If you wonder where does the context['task_instance'] and kwargs['ti'] comes from, you can refer to the Airflow macro documentation

Upvotes: 19

Related Questions