Rafa Acioly
Rafa Acioly

Reputation: 636

How to generate an differente ID for a task on airflow?

I'm trying to call a function with the @task annotation N times but I cannot define the task_id using this decorator, if I try to call it more than once it says:

airflow.exceptions.DuplicateTaskIdFound: Task id 'my_task_group.make_request__1' has already been added to the DAG

@task
def make_request(params):
    return true

def my_first_function():
    # do stuff
    return make_request(params)

def my_second_function():
    # do stuff
    return make_request(params)

for i in range(0, 10)
    first = my_first_function()  # this will call "make_request"
    second = my_second_function()  # this will also call "make_request"

    first >> second

How can I "rename" the task_id dinamically on a @task annotation?

Upvotes: 5

Views: 6946

Answers (1)

NicoE
NicoE

Reputation: 4873

Using the @task allows to dynamically generate task_id by calling the decorated function. The docs of _get_unique_task_id states:

Generate unique task id given a DAG (or if run in a DAG context) Ids are generated by appending a unique number to the end of the original task id.

Example:
task_id
task_id__1
task_id__2
...
task_id__20

With this feature there is no need to dynamically "rename" the tasks. In your code sample, you should decorate the functions that are being call in the loop. Here is a working running version 2.0.1 example:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context

default_args = {
    'owner': 'airflow',
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1), catchup=False, tags=['example'])
def task_decorator_example():

    def make_request(params):
        print(f"Params: {params}")

    def _print_task_id():
        context = get_current_context()
        print(f"Result: {context['ti'].task_id}")

    @task
    def my_first_function():
        _print_task_id()
        context = get_current_context()
        return make_request(context['params'])

    @task
    def my_second_function():
        _print_task_id()
        params = {'foo': 'bar'}
        return make_request(params)

    for i in range(0, 3):
        first = my_first_function()  # this will call "make_request"
        second = my_second_function()  # this will also call "make_request"

        first >> second


example_decorated_dag = task_decorator_example()

Which creates this graph view:

graph_view

Each task will print it task_id and params, the output of the combined logs is like this:

- my_first_function
{logging_mixin.py:104} INFO - Result: my_first_function
{logging_mixin.py:104} INFO - Params: {}
- my_second_function
{logging_mixin.py:104} INFO - Result: my_second_function
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__1
{logging_mixin.py:104} INFO - Result: my_first_function__1
{logging_mixin.py:104} INFO - Params: {}
- my_second_function__1
{logging_mixin.py:104} INFO - Result: my_second_function__1
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__2
{logging_mixin.py:104} INFO - Result: my_first_function__2
{logging_mixin.py:104} INFO - Params: {}

Hope that works for you!

Upvotes: 4

Related Questions