codninja0908
codninja0908

Reputation: 567

Dynamically generated tasks in Airflow 2.2.5 are moved to "REMOVED" state and breaks down the GANTT chart

Airflow Version : 2.2.5
Composer Version : 2.0.19

We have a task group which creates the tasks dynamically using for loop. Within the taskgroup we are making use of BigQueryTableDeleteOperator to delete the tables.

Issue: We noticed that once the tables are deleted, all the tasks moved to REMOVED state, hence breaking the GANTT chart with error message of Task not found.

Before the task run : Image 1

enter image description here

After the task runs: Image 2

enter image description here

As shown above, before the taskgroup run it shows all the tables to deleted represented by each task . In this example 2 tasks.

Once the task runs into success and the table is deleted, those tasks are removed.

Sharing the piece of code below :

for table in tables_list:
            table_name = projectid + '.' + dataset + '.' + table
            if table not in safe_tables:
                delete_table_task = bigquery_table_delete_operator.BigQueryTableDeleteOperator( task_id=f"delete_tables_{table_name}",
                                                                                            deletion_dataset_table=f"{table_name}",  
                                                                                            ignore_if_missing=True)
                list_operator += [delete_table_task]

list_operator
print(list_operator)
    
dummy_task >> list_operator

tables_list : List of tables to be deleted

safe_tables : List of tables not to be deleted

Please let me know what we are missing here which is causing the issue.

Upvotes: 1

Views: 1172

Answers (1)

Hussein Awala
Hussein Awala

Reputation: 5100

Every dag_dir_list_interval, the DagFileProcessorManager list the scripts in the dags folder, then if the script is new or processed since more than the min_file_process_interval, it creates DagFileProcessorProcess for the file to process it and generate the dags.

Since your dag is based on a method which collect the existing tables in tables_list, and use it to create the operators, in the dag file processing just after deleting the tables, the method will return an empty list, and the dag will not have any BigQueryTableDeleteOperator task instance.

For airflow 2.2.5, you can create a new custom operator based on BigQueryTableDeleteOperator which takes the list and delete the tables in a loop, in this case you will have one task for all the tables, and the delete will be run in sequence.

In airflow > 2.3.0 (if the upgrade is an option for you), you can use the new feature Dynamic Task Mapping, which can process a list in runtime, to process each element in a separate task.
You can do something like

BigQueryTableDeleteOperator.partial(
    task_id="delete_tables",
    ignore_if_missing=True
).expand(
    deletion_dataset_table=tables_list
)

Upvotes: 2

Related Questions