Alex K
Alex K

Reputation: 139

Airflow - How to incerase a value stored in XCom

I have fetched a value from a database and stored it in XCom which I would like to increase with 1. I have tried to increment it with following approaches without any luck. Is it possible to increase a value stored in XCom?

'{{ ti.xcom_pull("task_id") + 1}}'

'{{ int(ti.xcom_pull("task_id")) + 1}}'

EDIT

Here is part of my airflow DAG. I have one task that extract data from Hbase:

pull_data_hbase = BashOperator(
    task_id='pull_data_hbase',
    dag=dag,
    bash_command=<My_command_for_exract_data_from_hbase>,
    xcom_push=True)

Another task for update the table with increment 1:

data_to_hbase = BashOperator(
    task_id='data_to_hbase',
    dag=dag,
    bash_command=<Command_for_update_table_with_XCom_value>
                 % ('{{ ti.xcom_pull("pull_data_hbase") +1 }}')
)

when I am using '{{ int(ti.xcom_pull("task_id")) + 1}}' I get the following message:

[2022-01-13 20:39:47,104] {base_task_runner.py:101} INFO - Job 3868282: Subtask print_prev_task ('type:', "{{ ti.xcom_pull('pull_data_hbase') }}") [2022-01-13 20:39:47,105] {base_task_runner.py:101} INFO - Job 3868282: Subtask print_prev_task [2022-01-13 20:39:47,103] {cli.py:520} INFO - Running <TaskInstance: tv_paramount_monthly_report2.0.7-SNAPSHOT.print_prev_task 2021-11-15T00:00:00+00:00 [running]> on host dl100ven01.ddc.teliasonera.net

[2022-01-13 20:39:47,159] {models.py:1788} ERROR - 'int' is undefined

Upvotes: 0

Views: 1056

Answers (2)

Bas Harenslak
Bas Harenslak

Reputation: 3094

You don't have access to Python libraries/functions inside Jinja templates. The TLDR answer is:

"{{ ti.xcom_pull('pull_data_hbase') | int + 1 }}"

You can use certain functions in Jinja templates, these are called "macros" in Jinja. Airflow provides several macros out of the box: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#macros. You can also supply your own macros as shown by @Hitobat.

The other thing you can use in Jinja templates are "filters" (see built-in filters). These can be applied with a pipe (|), as shown above using the int filter.

Upvotes: 1

Hitobat
Hitobat

Reputation: 3037

You can write an actual Python function, and pass this in your DAG as a macro.

Then the function can be callable from airflow templated value. The name of the key in user macro dict is the name used from template.

eg.


def increment(task_instance, task_id):
    return int(task_instance.xcom_pull(task_id)) + 1


with DAG(
    dag_id='dag_id',
    user_defined_macros={'increment': increment},
) as dag:

    pull_data_hbase = BashOperator(
        task_id='pull_data_hbase',
        dag=dag,
        bash_command='echo x+1={{ increment(ti, "task_id") }}',
        xcom_push=True,
    )

Upvotes: 1

Related Questions