bradimus
bradimus

Reputation: 855

Celery + SQS - Using 'queue_name_prefix' writing to wrong queue

Im trying to set up a Celery task using SQS as the broker. I was able to get it working using the following (minimum reproducable):

from celery import Celery

app = Celery('tasks', broker=f'sqs://{aws_access}:{aws_secret}@')

@app.task
def test(s):
  print(s)

I run this celery -A tasks worker --loglevel=INFO, and then call it from the shell via:

from tasks import test
test.delay('printme')

This works fine. The problem is, that I want to run this in multiple environments, so I want to designate a separate queue per environment. I found the docs for queue_name_prefix, which seems like what I want, but I am unable to get it to work.

What I've tried:

First I added a config.py file as such:

broker_transport_options = {
  'queue_name_prefix': 'dev-',
  'region': 'us-east-1'
}

and run it celery -A tasks worker --loglevel=INFO --config=config

this creates a dev-celery queue on aws, but when I try test.delay('printme') it doesn't execute.

I then noticed that if I went back and ran celery without the --config flag it ran my test task. I checked and confirmed that the task_id matched, so it seems that, even though I'm running celery to read off the dev-celery queue, Im still writing to the celery queue.

I've also tried using app.conf.update to update the celery app in code, but it doesn't seem to be working.

How can I put scheduled job on the dev-celery queue?

Upvotes: 1

Views: 1791

Answers (2)

Mohd Shoaib
Mohd Shoaib

Reputation: 43

I have used the below code for queue_name_prefix... It is in Flask App running on production..

from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        broker="sqs://",
        broker_transport_options={
            "queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
        },
    )
    task_base = celery.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery.Task = ContextTask

    return celery

Upvotes: 0

bradimus
bradimus

Reputation: 855

So I did get this working, although I dont know if this is the optimal way. The problem seemed to be that the task was still being sent to the default "celery" queue, even though the worker was now listening on the "dev-celery" queue. Here's how I finally got it to work:

In the tasks.py code, I added a call to conf.update:

app = Celery('tasks')
app.config.update({
    'broker_url': f'sqs://{aws_access}:{aws_secret}@',
    'task_routes': {
        'test': {'queue': 'dev-celery'}
    }
})

and then when sending a task to the queue, using the apply_async method to explicitly declare the queue to use:

test.apply_async(
    args=['print test'],
    queue='dev-celery'
)

Hopefully that helps someone ;)

Upvotes: 4

Related Questions