Reputation: 567
Airflow Version : 2.2.5
Composer Version : 2.0.19
We have a task group which creates the tasks dynamically using for
loop. Within the taskgroup we are making use of BigQueryTableDeleteOperator
to delete the tables.
Issue: We noticed that once the tables are deleted, all the tasks moved to REMOVED
state, hence breaking the GANTT chart
with error message of Task not found
.
Before the task run : Image 1
After the task runs: Image 2
As shown above, before the taskgroup run it shows all the tables to deleted represented by each task . In this example 2 tasks.
Once the task runs into success and the table is deleted, those tasks are removed.
Sharing the piece of code below :
for table in tables_list:
table_name = projectid + '.' + dataset + '.' + table
if table not in safe_tables:
delete_table_task = bigquery_table_delete_operator.BigQueryTableDeleteOperator( task_id=f"delete_tables_{table_name}",
deletion_dataset_table=f"{table_name}",
ignore_if_missing=True)
list_operator += [delete_table_task]
list_operator
print(list_operator)
dummy_task >> list_operator
tables_list
: List of tables to be deleted
safe_tables
: List of tables not
to be deleted
Please let me know what we are missing here which is causing the issue.
Upvotes: 1
Views: 1172
Reputation: 5100
Every dag_dir_list_interval
, the DagFileProcessorManager
list the scripts in the dags folder, then if the script is new or processed since more than the min_file_process_interval
, it creates DagFileProcessorProcess
for the file to process it and generate the dags.
Since your dag is based on a method which collect the existing tables in tables_list
, and use it to create the operators, in the dag file processing just after deleting the tables, the method will return an empty list, and the dag will not have any BigQueryTableDeleteOperator
task instance.
For airflow 2.2.5
, you can create a new custom operator based on BigQueryTableDeleteOperator
which takes the list and delete the tables in a loop, in this case you will have one task for all the tables, and the delete will be run in sequence.
In airflow > 2.3.0
(if the upgrade is an option for you), you can use the new feature Dynamic Task Mapping, which can process a list in runtime, to process each element in a separate task.
You can do something like
BigQueryTableDeleteOperator.partial(
task_id="delete_tables",
ignore_if_missing=True
).expand(
deletion_dataset_table=tables_list
)
Upvotes: 2