Reputation: 363467
I've define a Celery
app in a module, and now I want to start the worker from the same module in its __main__
, i.e. by running the module with python -m
instead of celery
from the command line. I tried this:
app = Celery('project', include=['project.tasks'])
# do all kind of project-specific configuration
# that should occur whenever this module is imported
if __name__ == '__main__':
# log stuff about the configuration
app.start(['worker', '-A', 'project.tasks'])
but now Celery thinks I'm running the worker without arguments:
Usage: worker <command> [options]
Show help screen and exit.
Options:
-A APP, --app=APP app instance to use (e.g. module.attr_name)
[snip]
The usage message is the one you get from celery --help
, as if it didn't get a command. I've also tried
app.worker_main(['-A', 'project.tasks'])
but that complains about the -A
not being recognized.
So how do I do this? Or alternatively, how do I pass a callback to the worker to have it log information about its configuration?
Upvotes: 35
Views: 50637
Reputation: 1
https://github.com/celery/celery/discussions/8154#discussioncomment-10558142
envenlet will read msg but not run func . so this is my solution. mainapp.py
from celery import __main__
import sys
if __name__ == "__main__":
sys.argv=['.','-A','tasks.celery_app ','work','-P','eventlet']
__main__.main()
Upvotes: 0
Reputation: 1097
I would like to expand on Tomasz Hławiczka's answer for Celery 5+. As mentioned in that answer, the following works:
from celery import Celery
app = Celery() # args and kwargs as needed
if __name__ == "__main__":
worker = app.Worker()
worker.start()
However, I stumbled upon this answer while trying to run the Celery worker along side a Flask app with the Flask-SocketIO run
method. Specifically, I tried to setup my worker, then run my flask app, however, worker.start()
blocks, so the flask app never ran. To solve this, I used the following:
import subprocess
from celery import Celery
from flask import Flask
from flask_socketio import SocketIO
cel = Celery() # args and kwargs as needed
app = Flask(__name__)
socketio = SocketIO(app, message_queue=<redis or rabbitmq here>)
if __name__ == "__main__":
cmd = "celery -A project.cel worker -Q specific_queue".split(' ')
subprocess.Popen(cmd)
socketio.run(app)
Or more generally:
import subprocess
from celery import Celery
app = Celery() # args and kwargs as needed
if __name__ == "__main__":
cmd = "celery -A project.app worker -Q specific_queue".split(' ')
subprocess.Popen(cmd)
# more code you need to run after starting worker
In addition, there are times previously created workers are still running, and you don't want to start more. This is particularly true during development. To check for other workers before starting another, you can do the following:
import subprocess
from celery import Celery
app = Celery() # args and kwargs as needed
if __name__ == "__main__":
active_workers = app.control.inspect().active()
if active_workers == None:
cmd = "celery -A project.app worker -Q specific_queue".split(' ')
subprocess.Popen(cmd)
# more code you need to run after starting worker
If you are doing all this across multiple hosts and you need to check for a specific worker, do the following:
import subprocess
from celery import Celery
app = Celery() # args and kwargs as needed
if __name__ == "__main__":
active_workers = app.control.inspect().active()
if active_workers == None or "celery@hostname" not in active_workers.keys():
cmd = "celery -A project.app worker -Q specific_queue".split(' ')
subprocess.Popen(cmd)
# more code you need to run after starting worker
If you don't care about blocking, and don't want to use the subprocess
library but you still want your worker to listen to a specific queue, do the following:
from celery import Celery
app = Celery() # args and kwargs as needed
if __name__ == "__main__":
worker = app.Worker(queues=["specific_queue"])
worker.start()
Of course, if you want multiple workers, each listening to their own specific queue, you'd have to use the subprocess method, as you cannot start another worker after the first start()
call blocks.
Upvotes: 3
Reputation: 111
worker_main
was put back in celery 5.0.3 here:
https://github.com/celery/celery/pull/6481
This worked for me on 5.0.4:
self.app.worker_main(argv = ['worker', '--loglevel=info', '--concurrency={}'.format(os.environ['CELERY_CONCURRENCY']), '--without-gossip'])
Upvotes: 11
Reputation: 501
The worker_main
results now:
AttributeError: 'Celery' object has no attribute 'worker_main'
app = celery.Celery(
'project',
include=['project.tasks']
)
if __name__ == '__main__':
worker = app.Worker(
include=['project.tasks']
)
worker.start()
See here celery.apps.worker and celery.worker.WorkController.setup_defaults for details (hope it will be documented better in the future).
Upvotes: 22
Reputation: 1635
I think you are just missing wrapping the args so celery can read them, like:
queue = Celery('blah', include=['blah'])
queue.start(argv=['celery', 'worker', '-l', 'info'])
Upvotes: 4
Reputation: 497
using app.worker_main method (v3.1.12):
± cat start_celery.py
#!/usr/bin/python
from myapp import app
if __name__ == "__main__":
argv = [
'worker',
'--loglevel=DEBUG',
]
app.worker_main(argv)
Upvotes: 24
Reputation: 7028
Based on code from Django-Celery module you could try something like this:
from __future__ import absolute_import, unicode_literals
from celery import current_app
from celery.bin import worker
if __name__ == '__main__':
app = current_app._get_current_object()
worker = worker.worker(app=app)
options = {
'broker': 'amqp://guest:guest@localhost:5672//',
'loglevel': 'INFO',
'traceback': True,
}
worker.run(**options)
Upvotes: 14