Michal Dolnik
Michal Dolnik

Reputation: 111

Apache Airflow - dynamically generate number of BatchOperators (AWS) based on the number of files on AWS S3

I have a workflow that generates the number of *.tif files and saves on S3. Then there is a function that queries the content of the directory on S3 and returns it in arrays. According to this return value, there should be created a number of BatchOperators in DAG dynamically and each of the particular arrays should be assigned to BatchOperator as env variable.

Example:

Upvotes: 0

Views: 226

Answers (1)

Empty Whale
Empty Whale

Reputation: 43

You will want to use the .partial() and .expand() functions with the BatchOperators task. Input the constants to the partial, and then the elements to loop over in the expand() section as so:

list_to_print = ["Print", "Each", "Of", "These"]
def printList(list_to_print):
    for i in list_to_print:
        print(i)
        
task_print_list = PythonOperator.partial(
        task_id='print_list',
        ).expand(python_callable=printList(list_to_print))

task_print_list

This will loop over each element in the list. In your case you will want to pass the function that creates the list you've mentioned as the parameter in expand(). More documentation can be seen here: https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html

Upvotes: 2

Related Questions