Reputation: 458
I have a compute job I wish to scale dynamically according to charge.
Since the compute is done on AWS batch, I want to use the array_size parameter, here is how I do it with the AwsBatchOperator in airflow :
task_batch = AwsBatchOperator(
job_name=job_name,
job_definition=job_def,
job_queue=job_queue,
array_properties={'size': " {{ ti.xcom_pull(task_ids='compute_arraysize') | int }}" },
task_id="task_to_scale",
)
However, as far as I understood the documentation, the array_properties parameter isn't "templated" (not sure why though), so my template is not rendered and I have this error :
Invalid type for parameter arrayProperties.size, value: {{ ti.xcom_pull(task_ids='compute_arraysize') | int }}, type: <class 'str'>, valid types: <class 'int'>
How can I fix this and decide the size dynamically ?
Upvotes: 0
Views: 396
Reputation: 15979
As explained by feruzzi PR will solve the issue and will be available in
apache-airflow-providers-amazon>6.0.0
However you can still solve your issue in your current version by adding the needed parameter to template fields:
class MyBatchOperator(BatchOperator):
template_fields = (
"array_properties",
) + BatchOperator.template_fields
Then you can do:
task_batch = MyBatchOperator(
job_name=job_name,
job_definition=job_def,
job_queue=job_queue,
array_properties={'size': " {{ ti.xcom_pull(task_ids='compute_arraysize') | int }}" },
task_id="task_to_scale",
)
Upvotes: 1
Reputation: 141
The easy local fix will be to add "array_properties"
to the template_fields
sequence on line 104 of the AwsBatchOperator. I will put in a pull request to implement it in the Airflow code, but it will take some time for that to make its way to you.
[EDIT] The PR to fix it permanently is here if you want to follow its progress. It should be in the next Amazon Provider Package release, whenever that gets published.
Upvotes: 2