Amit Singh
Amit Singh

Reputation: 43

Airflow tasks execution on multiple s3 keys followed by next task execution

I have a use case where we have a set of 10 files(let's say) in a s3 directory. We are trying to rename those files to their corresponding mapped renamed file name pattern in a second directory. I have created the task id uniques dynamically by passing the filename.

 for file in rename_list:
    rename_tak = RenameOperator(
        task_id="file_rename_task_{}".format(str(file.split(":")[0])),
        s3_conn_id=s3_CONN_ID,
        source_s3_bucket=source_bucket,
        destination_s3_bucket=destination_bucket,
        s3_key = source_prefix + source_key,
        rename_key = destination_key,
        output_prefix = output_prefix,
        dag=dag))

Then I again need to perform another operation on it and finally move it to the final s3 directory. That will also run in the for loop as we have multiple files.

Now, the issue is the second operator/task execution is not getting called, no error but the Airflow logs says "Task is in the 'removed' state which is not a valid state for execution. The task must be cleared in order to be run." The first tasks of rename operators are all successful but the second operators are simply getting removed, no output, no logs.

Any feedback what might be going wrong here.

Upvotes: 1

Views: 1103

Answers (1)

arunvelsriram
arunvelsriram

Reputation: 1096

Airflow workflows are static and its not recommended to have dynamic task IDs. Dynamic DAGs as far as I know can be achieved through some hacks but not directly supported by Airflow.

Now, the issue is the second operator/task execution is not getting called, no error but the Airflow logs says "Task is in the 'removed' state which is not a valid state for execution. The task must be cleared in order to be run." The first tasks of rename operators are all successful but the second operators are simply getting removed, no output, no logs.

Without knowing how you have added dependencies between tasks it would be diffcult to identify the issue.

My understanding of the problem:

  1. you have an s3 bucket containing some files in a path (source)
  2. you need to apply some operation on each of those files
  3. move the finished files to a new s3 path (destination)

If my understanding is correct, the way I would design the workflow is:

download_files_from_source >> apply_some_operation_on_downloaded_files >> move_files_to_destination

This will work if you have a shared filesystem across workers. Each run of the DAG should have its own staging directory so that files belonging to different DAG runs do not overlap.

Otherwise you could also write a custom operator that does all the three tasks in the previous solution i.e. download files from source, apply some operation on the downloaded files and move the files to destination.

Upvotes: 1

Related Questions