Reputation: 636
I'm trying to call a function with the @task
annotation N
times but I cannot define the task_id
using this decorator, if I try to call it more than once it says:
airflow.exceptions.DuplicateTaskIdFound: Task id 'my_task_group.make_request__1' has already been added to the DAG
@task
def make_request(params):
return true
def my_first_function():
# do stuff
return make_request(params)
def my_second_function():
# do stuff
return make_request(params)
for i in range(0, 10)
first = my_first_function() # this will call "make_request"
second = my_second_function() # this will also call "make_request"
first >> second
How can I "rename" the task_id
dinamically on a @task
annotation?
Upvotes: 5
Views: 6946
Reputation: 4873
Using the @task
allows to dynamically generate task_id
by calling the decorated function. The docs of _get_unique_task_id
states:
Generate unique task id given a DAG (or if run in a DAG context) Ids are generated by appending a unique number to the end of the original task id.
Example:
task_id
task_id__1
task_id__2
...
task_id__20
With this feature there is no need to dynamically "rename" the tasks. In your code sample, you should decorate the functions that are being call in the loop. Here is a working running version 2.0.1 example:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1), catchup=False, tags=['example'])
def task_decorator_example():
def make_request(params):
print(f"Params: {params}")
def _print_task_id():
context = get_current_context()
print(f"Result: {context['ti'].task_id}")
@task
def my_first_function():
_print_task_id()
context = get_current_context()
return make_request(context['params'])
@task
def my_second_function():
_print_task_id()
params = {'foo': 'bar'}
return make_request(params)
for i in range(0, 3):
first = my_first_function() # this will call "make_request"
second = my_second_function() # this will also call "make_request"
first >> second
example_decorated_dag = task_decorator_example()
Which creates this graph view:
Each task will print it task_id
and params
, the output of the combined logs is like this:
- my_first_function
{logging_mixin.py:104} INFO - Result: my_first_function
{logging_mixin.py:104} INFO - Params: {}
- my_second_function
{logging_mixin.py:104} INFO - Result: my_second_function
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__1
{logging_mixin.py:104} INFO - Result: my_first_function__1
{logging_mixin.py:104} INFO - Params: {}
- my_second_function__1
{logging_mixin.py:104} INFO - Result: my_second_function__1
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__2
{logging_mixin.py:104} INFO - Result: my_first_function__2
{logging_mixin.py:104} INFO - Params: {}
Hope that works for you!
Upvotes: 4