Reputation: 43
I have a use case where we have a set of 10 files(let's say) in a s3 directory. We are trying to rename those files to their corresponding mapped renamed file name pattern in a second directory. I have created the task id uniques dynamically by passing the filename.
for file in rename_list:
rename_tak = RenameOperator(
task_id="file_rename_task_{}".format(str(file.split(":")[0])),
s3_conn_id=s3_CONN_ID,
source_s3_bucket=source_bucket,
destination_s3_bucket=destination_bucket,
s3_key = source_prefix + source_key,
rename_key = destination_key,
output_prefix = output_prefix,
dag=dag))
Then I again need to perform another operation on it and finally move it to the final s3 directory. That will also run in the for loop as we have multiple files.
Now, the issue is the second operator/task execution is not getting called, no error but the Airflow logs says "Task is in the 'removed' state which is not a valid state for execution. The task must be cleared in order to be run." The first tasks of rename operators are all successful but the second operators are simply getting removed, no output, no logs.
Any feedback what might be going wrong here.
Upvotes: 1
Views: 1103
Reputation: 1096
Airflow workflows are static and its not recommended to have dynamic task IDs. Dynamic DAGs as far as I know can be achieved through some hacks but not directly supported by Airflow.
Now, the issue is the second operator/task execution is not getting called, no error but the Airflow logs says "Task is in the 'removed' state which is not a valid state for execution. The task must be cleared in order to be run." The first tasks of rename operators are all successful but the second operators are simply getting removed, no output, no logs.
Without knowing how you have added dependencies between tasks it would be diffcult to identify the issue.
My understanding of the problem:
If my understanding is correct, the way I would design the workflow is:
download_files_from_source >> apply_some_operation_on_downloaded_files >> move_files_to_destination
This will work if you have a shared filesystem across workers. Each run of the DAG should have its own staging directory so that files belonging to different DAG runs do not overlap.
Otherwise you could also write a custom operator that does all the three tasks in the previous solution i.e. download files from source, apply some operation on the downloaded files and move the files to destination.
Upvotes: 1