Shikhar Thapliyal
Shikhar Thapliyal

Reputation: 610

Failed to deliver acknowledgement to Rabbit MQ (flask socket io + gevent +celery)

We are facing an issue with while running the flask application using flask-socketio and a background process using celery, where celery is unable to pick tasks from queue after successfully executing 2-3 tasks in continuation. The idea was to be able to create a socket connection with the client and backend service and emit to the same from a background process. In order to emit/broadcast to the socket, we have used rabbitmq as a message queue while creating a socket io instance:

socketio = SocketIO(app, cors_allowed_origins="*", async_mode='gevent', transports=['websocket'], message_queue=socket_io_rmq_url)

We are using gunicorn in front of flask application using gevent as worker class and monkey patching the flask code:

gunicorn --preload --workers 10 --worker-class gevent --timeout 600 --log-level debug --access-logfile logs/gunicorn_access.log --error-logfile logs/gunicorn_error.log main:app --bind 0.0.0.0:5000 &

Monkey patching the flask app_config.py file where we are initiating Socket IO instance, Flask App & Celery instance as well:

from gevent import monkey
monkey.patch_all()
from celery import Celery
from celery.signals import setup_logging
from threading import local
import logging.config
import os
from logstash.formatter import LogstashFormatterBase
from logging.config import dictConfig
from flask import Flask
from flask_socketio import SocketIO
from flask_cors import CORS

_locals = local()

app = Flask(__name__)

CORS(app)

BASE_DIR = os.getcwd()
LOGS_DIR = os.path.join(BASE_DIR, "logs")

RMQ_BROKER_URL = os.environ.get("RMQ_URL", "")
RMQ_SSL = True if "amqps" in RMQ_BROKER_URL else False
MACHINE_HOST = os.environ.get("HOSTNAME", "")

celery = Celery(
    Flask(__name__).name,
    broker=RMQ_BROKER_URL
)
celery.conf.worker_prefetch_multiplier = 1
celery.conf.task_acks_late = False
celery.conf.update({
    'worker_hijack_root_logger': False,
    'worker_redirect_stdouts': False,
})

socket_io_rmq_list = RMQ_BROKER_URL.split("/")
socket_io_rmq_list[-1] = socket_io_rmq_list[-1] + "_socketio"
socket_io_rmq_url = "/".join(socket_io_rmq_list)

socketio = SocketIO(
    cors_allowed_origins="*",
    async_mode='gevent',
    transports=['websocket'],
    message_queue=socket_io_rmq_url
)

Celery command used to run the worker via docker:

celery -A main.celery worker --without-heartbeat --without-gossip --without-mingle --loglevel=debug --logfile=logs/celery_worker.log --concurrency=1 -n "${HOSTNAME}_METRIC_REFRESH_NEXTGEN" --max-tasks-per-child=1

Code Structure:

│   Dockerfile
│   docker_entrypoint.sh
│   main.py
│   requirements.txt
│   __init__.py
│
├───api
│   │   auth.py
│   │   connectors.py
│   │   constants.py
│   │   tasks.py
│   │   views.py
│   │   __init__.py
│   
│
├───config
│   │   app_config.py
│   │   exceptions.py
│   │   utils.py
│   │   __init__.py
│   
│
├───entities
│       metric_refresh.py
│       metric_validations.py
│       sample_template.py
│       __init__.py
│

About the code structure:

Flask main.py that is executed by gunicorn command:

from api.views import app, socketio
from config.app_config import celery

if __name__ == '__main__':
    socketio.run(app, host="0.0.0.0", debug=False, threaded=False)

We have other services running perfectly fine without this socket io implementation where celery executes tasks and sends acknowledgement to RMQ at the start since acks_late flag is set to False. But in the case where all this configuration is used to implement socket and emitting from background task using queue, celery somehow fails to send any acknowledgement to RMQ after 1-2 successful attempts and worker get lost. This further blocks the queue and worker does not pick any new tasks.

Upvotes: 0

Views: 67

Answers (0)

Related Questions