gelonida
gelonida

Reputation: 5630

Celery: different settings for task_acks_late per worker / add custom option to celery

This question is a follow up of django + celery: disable prefetch for one worker, Is there a bug?

I had a problem with celery (see the question that I follow up) and in order to resolve it I'd like to have two celery workers with -concurrency 1 each but with two different settings of task_acks_late.

My current approach is working, but in my opinion not very beautiful. I am doing the following:

in settings.py of my django project:

CELERY_TASK_ACKS_LATE = os.environ.get("LACK", "False") == "True"

This allows me to start the celery workers with following commands:

LACK=True celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 1 
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast

What would be more intuitive would be if I could do something like:

celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 1 --late-ack=True
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast --late-ack=False

I found Initializing Different Celery Workers with Different Values but don't understand how to embed this in my django / celery context. In which files would I have to add the code that's adding an argument to the parser and how could I use the custom param to modify task_acks_late of the celery settings.

Update: Thanks to @Greenev's answer I managed to add custom options to celery. However it seems, that changing the config with this mechanism 'arrives too late' and the chagne is not taken into account.

Upvotes: 3

Views: 4563

Answers (1)

Greenev
Greenev

Reputation: 909

One possible solution here is to provide acks_late=True as an argument of the shared_task decorator, given your code from the prior question:

@shared_task(acks_late=True)
def task_fast(delay=0.1):
    logger.warning("fast in")
    time.sleep(delay)
    logger.warning("fast out")

UPD. I haven't got task_acks_late to be set using this approach, but you could add a command line argument as follows.

You've already linked to a solution. I can't see any django specifics here, just put the parser.add_argument code to where you have defined your app, given your code from the prior question, you would have something like this:

app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')

def add_worker_arguments(parser):
    parser.add_argument('--late-ack', default=False)

app.user_options['worker'].add(add_worker_arguments)

Then you could access your argument value in celeryd_init signal handler

@celeryd_init.connect
def configure_worker(sender=None, conf=None, options=None, **kwargs):
    conf.task_acks_late = options.get('late-ack') # get custom argument value from options

Upvotes: 3

Related Questions