Tegar D Pratama
Tegar D Pratama

Reputation: 167

Airflow: Problem with creating dynamic task within TaskGroup

I'm trying to make a dynamic workflow.

I got this broken DAG error duplicate task id

Broken DAG: [/opt/airflow/dags/academi_dag.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 430, in __init__
    task_group.add(self)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/task_group.py", line 140, in add
    raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
airflow.exceptions.DuplicateTaskIdFound: Task id 'Review.extract__1' has already been added to the DAG

My code:

@task
def extract(filename):
    some_extract_function

@task
def transform(item :list):
    some_transform_function

with TaskGroup('Review') as Review:
    data = []
    filenames = os.listdir(DATA_PATH)
    filtered_filenames = list(filter(lambda x: re.match(r"(^review)", x), filenames))
    for filename in filtered_filenames:
        extract_review = extract(filename)
        data.append(extract_review)
    transformed_data_review = transform(data)

The problem arise when I tried to create task dynamically within TaskGroup. If I removed TaskGroup it worked fine.

I found out about this issue here https://github.com/apache/airflow/issues/8057 . Is there any way to fix this error? such as creating custom task_id dynamically? I know this is possible using PythonOperator. But I'm trying to do this using TaskFlow API instead.

Thanks

Upvotes: 4

Views: 10975

Answers (1)

Tegar D Pratama
Tegar D Pratama

Reputation: 167

Fixed thanks to this video here

So I fixed this by creating TaskGroup dynamically within TaskGroup.

Here's the code

with TaskGroup('Review') as Review:
    data = []
    filenames = os.listdir(DATA_PATH)
    filtered_filenames = list(filter(lambda x: re.match(r"(^review)", x), filenames))
    for filename in filtered_filenames:
        with TaskGroup(filename):
            extract_review = extract(filename)
            data.append(extract_review)
    transformed_data_review = transform(data)

Upvotes: 4

Related Questions