Reputation: 167
I'm trying to make a dynamic workflow.
I got this broken DAG error duplicate task id
Broken DAG: [/opt/airflow/dags/academi_dag.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 430, in __init__
task_group.add(self)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/task_group.py", line 140, in add
raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
airflow.exceptions.DuplicateTaskIdFound: Task id 'Review.extract__1' has already been added to the DAG
My code:
@task
def extract(filename):
some_extract_function
@task
def transform(item :list):
some_transform_function
with TaskGroup('Review') as Review:
data = []
filenames = os.listdir(DATA_PATH)
filtered_filenames = list(filter(lambda x: re.match(r"(^review)", x), filenames))
for filename in filtered_filenames:
extract_review = extract(filename)
data.append(extract_review)
transformed_data_review = transform(data)
The problem arise when I tried to create task dynamically within TaskGroup
. If I removed TaskGroup
it worked fine.
I found out about this issue here https://github.com/apache/airflow/issues/8057 . Is there any way to fix this error? such as creating custom task_id dynamically? I know this is possible using PythonOperator
. But I'm trying to do this using TaskFlow API
instead.
Thanks
Upvotes: 4
Views: 10975
Reputation: 167
Fixed thanks to this video here
So I fixed this by creating TaskGroup dynamically within TaskGroup.
Here's the code
with TaskGroup('Review') as Review:
data = []
filenames = os.listdir(DATA_PATH)
filtered_filenames = list(filter(lambda x: re.match(r"(^review)", x), filenames))
for filename in filtered_filenames:
with TaskGroup(filename):
extract_review = extract(filename)
data.append(extract_review)
transformed_data_review = transform(data)
Upvotes: 4