Reputation: 855
Im trying to set up a Celery task using SQS as the broker. I was able to get it working using the following (minimum reproducable):
from celery import Celery
app = Celery('tasks', broker=f'sqs://{aws_access}:{aws_secret}@')
@app.task
def test(s):
print(s)
I run this celery -A tasks worker --loglevel=INFO
, and then call it from the shell via:
from tasks import test
test.delay('printme')
This works fine. The problem is, that I want to run this in multiple environments, so I want to designate a separate queue per environment. I found the docs for queue_name_prefix
, which seems like what I want, but I am unable to get it to work.
What I've tried:
First I added a config.py
file as such:
broker_transport_options = {
'queue_name_prefix': 'dev-',
'region': 'us-east-1'
}
and run it celery -A tasks worker --loglevel=INFO --config=config
this creates a dev-celery
queue on aws, but when I try test.delay('printme')
it doesn't execute.
I then noticed that if I went back and ran celery without the --config
flag it ran my test task. I checked and confirmed that the task_id
matched, so it seems that, even though I'm running celery to read off the dev-celery
queue, Im still writing to the celery
queue.
I've also tried using app.conf.update
to update the celery app in code, but it doesn't seem to be working.
How can I put scheduled job on the dev-celery
queue?
Upvotes: 1
Views: 1791
Reputation: 43
I have used the below code for queue_name_prefix... It is in Flask App running on production..
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker="sqs://",
broker_transport_options={
"queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
},
)
task_base = celery.Task
class ContextTask(task_base):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return task_base.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
Upvotes: 0
Reputation: 855
So I did get this working, although I dont know if this is the optimal way. The problem seemed to be that the task was still being sent to the default "celery" queue, even though the worker was now listening on the "dev-celery" queue. Here's how I finally got it to work:
In the tasks.py
code, I added a call to conf.update
:
app = Celery('tasks')
app.config.update({
'broker_url': f'sqs://{aws_access}:{aws_secret}@',
'task_routes': {
'test': {'queue': 'dev-celery'}
}
})
and then when sending a task to the queue, using the apply_async
method to explicitly declare the queue to use:
test.apply_async(
args=['print test'],
queue='dev-celery'
)
Hopefully that helps someone ;)
Upvotes: 4