mfdoom
mfdoom

Reputation: 11

How to set SQS predefined_queues for Celery with Apache Airflow configuration?

I am configuring Apache Airflow using celery with Amazon SQS. I understand that Celery allows for broker_transport_options (https://docs.celeryproject.org/en/stable/getting-started/brokers/sqs.html) and Airflow contains a section in its config called celery_broker_transport_options.

I understand that I am able to pass simple strings in the Airflow config. For example, in. the celery_broker_transport section, I could pass:

region = us-west-1

which would be the equivalent of saying to celery:

broker_transport_options = {'region': 'us-west-1'}

I am trying to pass the predefined_queues option in Airflow, which looks like the following in Celery:

broker_transport_options = {
    'predefined_queues': {
        'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
        }
    }
}

I am unsure how to pass this information to Airflow. I have tried the following, and I get an error saying that 'str' object has no attribute 'items':

predefined_queues = 'my-q': { 'url': 'https://sqs.us-east-1.amazonaws.com/1234567890/my-q', }

Upvotes: 1

Views: 1150

Answers (1)

ptd
ptd

Reputation: 3053

You can't set this value directly via the airflow config file. Instead, you need to use the celery_config_options configuration value to point to a module that sets predefined_queues in python code.

Something like this is likely what you want:

from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG

CELERY_CONFIG = {
    **DEFAULT_CELERY_CONFIG,
    "broker_transport_options": {
        **DEFAULT_CELERY_CONFIG["broker_transport_options"],
        "predefined_queues": {
            "my-q": { "url": "https://sqs.us-east-1.amazonaws.com/1234567890/my-q" },
        },
    },
}

If you put that in a file called celery_config.py, you should then be able to set celery_config_options = celery_config.CELERY_CONFIG in your configuration file and have celery configured properly.

Upvotes: 2

Related Questions