Reputation: 111
I have a workflow that generates the number of *.tif
files and saves on S3. Then there is a function that queries the content of the directory on S3 and returns it in arrays. According to this return value, there should be created a number of BatchOperators
in DAG dynamically and each of the particular arrays should be assigned to BatchOperator as env variable.
Example:
[[a.tif, b.tif], [c.tif, d.tif], [e.tif]]
BatchOperators
, with arrays passed as env variable to them
BatchOperator1
- env var [a.tif, b.tif]
BatchOperator2
- env var [c.tif, d.tif]
BatchOperator3
- env var [e.tif]
Upvotes: 0
Views: 226
Reputation: 43
You will want to use the .partial() and .expand() functions with the BatchOperators task. Input the constants to the partial, and then the elements to loop over in the expand() section as so:
list_to_print = ["Print", "Each", "Of", "These"]
def printList(list_to_print):
for i in list_to_print:
print(i)
task_print_list = PythonOperator.partial(
task_id='print_list',
).expand(python_callable=printList(list_to_print))
task_print_list
This will loop over each element in the list. In your case you will want to pass the function that creates the list you've mentioned as the parameter in expand(). More documentation can be seen here: https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
Upvotes: 2