Reputation: 5630
This question is a follow up of django + celery: disable prefetch for one worker, Is there a bug?
I had a problem with celery (see the question that I follow up) and in order to resolve it I'd like to have two celery workers with -concurrency 1 each but with two different settings of task_acks_late.
My current approach is working, but in my opinion not very beautiful. I am doing the following:
in settings.py
of my django project:
CELERY_TASK_ACKS_LATE = os.environ.get("LACK", "False") == "True"
This allows me to start the celery workers with following commands:
LACK=True celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 1
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast
What would be more intuitive would be if I could do something like:
celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 1 --late-ack=True
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast --late-ack=False
I found Initializing Different Celery Workers with Different Values but don't understand how to embed this in my django / celery context. In which files would I have to add the code that's adding an argument to the parser and how could I use the custom param to modify task_acks_late of the celery settings.
Update: Thanks to @Greenev's answer I managed to add custom options to celery. However it seems, that changing the config with this mechanism 'arrives too late' and the chagne is not taken into account.
Upvotes: 3
Views: 4563
Reputation: 909
One possible solution here is to provide acks_late=True
as an argument of the shared_task
decorator, given your code from the prior question:
@shared_task(acks_late=True)
def task_fast(delay=0.1):
logger.warning("fast in")
time.sleep(delay)
logger.warning("fast out")
UPD. I haven't got task_acks_late
to be set using this approach, but you could add a command line argument as follows.
You've already linked to a solution. I can't see any django specifics here, just put the parser.add_argument
code to where you have defined your app
, given your code from the prior question, you would have something like this:
app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')
def add_worker_arguments(parser):
parser.add_argument('--late-ack', default=False)
app.user_options['worker'].add(add_worker_arguments)
Then you could access your argument value in celeryd_init
signal handler
@celeryd_init.connect
def configure_worker(sender=None, conf=None, options=None, **kwargs):
conf.task_acks_late = options.get('late-ack') # get custom argument value from options
Upvotes: 3