Reputation: 139
I have fetched a value from a database and stored it in XCom which I would like to increase with 1. I have tried to increment it with following approaches without any luck. Is it possible to increase a value stored in XCom?
'{{ ti.xcom_pull("task_id") + 1}}'
'{{ int(ti.xcom_pull("task_id")) + 1}}'
EDIT
Here is part of my airflow DAG. I have one task that extract data from Hbase:
pull_data_hbase = BashOperator(
task_id='pull_data_hbase',
dag=dag,
bash_command=<My_command_for_exract_data_from_hbase>,
xcom_push=True)
Another task for update the table with increment 1:
data_to_hbase = BashOperator(
task_id='data_to_hbase',
dag=dag,
bash_command=<Command_for_update_table_with_XCom_value>
% ('{{ ti.xcom_pull("pull_data_hbase") +1 }}')
)
when I am using '{{ int(ti.xcom_pull("task_id")) + 1}}'
I get the following message:
[2022-01-13 20:39:47,104] {base_task_runner.py:101} INFO - Job 3868282: Subtask print_prev_task ('type:', "{{ ti.xcom_pull('pull_data_hbase') }}") [2022-01-13 20:39:47,105] {base_task_runner.py:101} INFO - Job 3868282: Subtask print_prev_task [2022-01-13 20:39:47,103] {cli.py:520} INFO - Running <TaskInstance: tv_paramount_monthly_report2.0.7-SNAPSHOT.print_prev_task 2021-11-15T00:00:00+00:00 [running]> on host dl100ven01.ddc.teliasonera.net
[2022-01-13 20:39:47,159] {models.py:1788} ERROR - 'int' is undefined
Upvotes: 0
Views: 1056
Reputation: 3094
You don't have access to Python libraries/functions inside Jinja templates. The TLDR answer is:
"{{ ti.xcom_pull('pull_data_hbase') | int + 1 }}"
You can use certain functions in Jinja templates, these are called "macros" in Jinja. Airflow provides several macros out of the box: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#macros. You can also supply your own macros as shown by @Hitobat.
The other thing you can use in Jinja templates are "filters" (see built-in filters). These can be applied with a pipe (|
), as shown above using the int
filter.
Upvotes: 1
Reputation: 3037
You can write an actual Python function, and pass this in your DAG as a macro.
Then the function can be callable from airflow templated value. The name of the key in user macro dict is the name used from template.
eg.
def increment(task_instance, task_id):
return int(task_instance.xcom_pull(task_id)) + 1
with DAG(
dag_id='dag_id',
user_defined_macros={'increment': increment},
) as dag:
pull_data_hbase = BashOperator(
task_id='pull_data_hbase',
dag=dag,
bash_command='echo x+1={{ increment(ti, "task_id") }}',
xcom_push=True,
)
Upvotes: 1