Boris Gaganelov
Boris Gaganelov

Reputation: 327

Celery, RabbitMQ messages keep getting sent

Here is the setup - django project with celery and a CloudAMQP rabbitMQ worker doing the message brokering.

My Celery/RabbitMQ settings:

# RabbitMQ & Celery settings
BROKER_URL = 'ampq://guest:guest@localhost:5672/' # Understandably fake
BROKER_POOL_LIMIT = 1
BROKER_CONNECTION_TIMEOUT = 30
BROKER_HEARTBEAT = 30
CELERY_SEND_EVENTS = False
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

A docker container running celery with the following command:

bash -c 'cd django && celery -A pkm_main worker -E -l info --concurrency=3'

The shared_task definition:

from __future__ import absolute_import

from celery import shared_task

@shared_task
def push_notification(user_id, message):
    logging.critical('Push notifications sent')
    return {'status': 'success'}

And me actually calling it when something happens (I have omitted some of the code because it does not seem to be relevant):

from notificatons.tasks import push_notification

    def like_this(self, **args):
    # Do like stuff and then do .delay()
    push_notification.delay(media.user.id, request.user.username + ' has liked your item')

So when this is ran - everything seems fine and dandy - the output looks like so:

worker_1 | [2016-03-25 09:03:34,888: INFO/MainProcess] Received task: notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516]
worker_1 | [2016-03-25 09:03:35,333: CRITICAL/Worker-1] Push notifications sent
worker_1 | [2016-03-25 09:03:35,336: INFO/MainProcess] Task notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516] succeeded in 0.444933412999s: {'status': 'success'}

So from what I gather the task has been ran and executed properly, the messages should be stopped and RabbitMQ should stop.

But in my RabbitMQ Management I see messages getting published and delivered non-stop:

RabbitMQ Admin GUI

So what I'm gathering from this is that RabbitMQ is trying to send some sort of confirmation and failing and retrying? Is there a way to actually turn this behavior off?

All help and advice is warmly welcomed.

EDIT: Forgot to mentions something important - until I call on push_notification.delay() the message tab is empty save for the heartbeat that comes and goes every 30 seconds. Only after I have called .delay() does this happen.

EDIT 2: CELERYBEAT_SCHEDULE settings (I've tried running with and without them - there was no difference but adding them just in case)

CELERYBEAT_SCHEDULE = {
    "minutely_process_all_notifications": {
        'task': 'transmissions.tasks.process_all_notifications',
        'schedule': crontab(minute='*')
    }
}

EDIT 3: Added View code. Also I'm not using the CELERYBEAT_SCHEDULE. I'm just keeping the config in the code for future scheduled tasks

from notifications.tasks import push_notification

class MediaLikesView(BaseView):
    def post(self, request, media_id):
        media = self.get_object(media_id)
        data = {}
        data['media'] = media.id
        data['user'] = request.user.id
        serializer = MediaLikeSerializer(data=data)
        if serializer.is_valid():
            like = serializer.save()
            push_notification.delay(media.user.id, request.user.username + ' has liked your item')
            serializer = MediaGetLikeSerializer(like)
            return self.get_mocked_pagination_response(status=status.HTTP_204_NO_CONTENT)
        return self.get_mocked_pagination_response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

Upvotes: 3

Views: 1579

Answers (1)

Carl Hörberg
Carl Hörberg

Reputation: 6005

It's Celery's mingle and gossiping. Disable by adding --without-gossip --without-mingle --without-heartbeat to the command line arguments.

Also don't forget to set BROKER_HEARTBEAT = None when you've disabled heartbeats on the commandline, otherwise you'll disconnected after 30s. It's most often better to rely on TCP keepalive then AMQP heartbeats, or even worse, Celery's own heartbeats.

Upvotes: 2

Related Questions