Reputation: 11
I am configuring Apache Airflow using celery with Amazon SQS. I understand that Celery allows for broker_transport_options (https://docs.celeryproject.org/en/stable/getting-started/brokers/sqs.html) and Airflow contains a section in its config called celery_broker_transport_options.
I understand that I am able to pass simple strings in the Airflow config. For example, in. the celery_broker_transport section, I could pass:
region = us-west-1
which would be the equivalent of saying to celery:
broker_transport_options = {'region': 'us-west-1'}
I am trying to pass the predefined_queues option in Airflow, which looks like the following in Celery:
broker_transport_options = {
'predefined_queues': {
'my-q': {
'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
'access_key_id': 'xxx',
'secret_access_key': 'xxx',
}
}
}
I am unsure how to pass this information to Airflow. I have tried the following, and I get an error saying that 'str' object has no attribute 'items':
predefined_queues = 'my-q': { 'url': 'https://sqs.us-east-1.amazonaws.com/1234567890/my-q', }
Upvotes: 1
Views: 1150
Reputation: 3053
You can't set this value directly via the airflow config file. Instead, you need to use the celery_config_options
configuration value to point to a module that sets predefined_queues
in python code.
Something like this is likely what you want:
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
CELERY_CONFIG = {
**DEFAULT_CELERY_CONFIG,
"broker_transport_options": {
**DEFAULT_CELERY_CONFIG["broker_transport_options"],
"predefined_queues": {
"my-q": { "url": "https://sqs.us-east-1.amazonaws.com/1234567890/my-q" },
},
},
}
If you put that in a file called celery_config.py
, you should then be able to set celery_config_options = celery_config.CELERY_CONFIG
in your configuration file and have celery configured properly.
Upvotes: 2