Jeff
Jeff

Reputation: 1079

Redis connections not being released after Celery task is complete

I'm using Redis for two things: 1) As the Celery backend and 2) As a lock holder for my Celery task

Here's an example of the code I'm running:

def get_redis():
    url = os.environ.get("REDIS_URL")

    if url:
        r = redis.from_url(url)  # use secure for heroku
    else:
        r = redis.Redis()  # use unauthed connection locally

    return r

@app.task(bind=True, max_retries=10)
def test_delay_task(self, task_id):
    ''' Each task with try to grab a lock and once it does, will sleep 5 seconds, then
    print and exit.
    '''
    have_lock = False
    r = get_redis()
    lock = r.lock('mws_api')
    try:
        have_lock = lock.acquire(blocking=False)
        if have_lock:
            logger.warning("{} Lock Acquired".format(task_id))
            time.sleep(5)
            logger.warning('Test Task {} successful!'.format(task_id))
        else:
            logger.warning("{} Lock In Use, Retrying".format(task_id))
            self.request.retries = 1
            self.retry(countdown=5 * random.uniform(0.8, 1.2))

    finally:
        if have_lock:
            lock.release()

        # We'll come back to this code, but it partially works
        # c = r.info()['connected_clients']
        # print("Disconnecting Redis | Connections: {}".format(c))
        # r.connection_pool.disconnect()


@app.task(bind=True, max_retries=10)
def test_parallel_tasks(self):
    ''' Runs 10 consecutive tasks, each which will try to grab a lock and run. '''
    for i in range(5):
        test_delay_task.delay(i)

When I run this, I get a large spike in connections to Redis. I'm measuring this with this code:

def get_connected_clients():
    try:
        connections = 0
        while True:
            time.sleep(.25)
            c = get_redis().info()['connected_clients']
            # c = redis.Redis().info()['connected_clients']
            if c != connections:
                now = datetime.datetime.now()
                print("{} | Active Connections: {}".format(now, c))
                connections = c
            else:
                continue
    except KeyboardInterrupt:
        print("Shutting Down")

The results are this:

Celery Starts
2017-11-04 01:29:51.463512 | Active Connections: 7
2017-11-04 01:29:52.477220 | Active Connections: 12


Run Task
2017-11-04 01:30:18.755118 | Active Connections: 33
2017-11-04 01:30:23.847573 | Active Connections: 34
2017-11-04 01:30:24.101263 | Active Connections: 39
2017-11-04 01:30:24.610450 | Active Connections: 40
2017-11-04 01:30:28.944949 | Active Connections: 41
2017-11-04 01:30:30.208845 | Active Connections: 43
2017-11-04 01:30:33.780812 | Active Connections: 42
2017-11-04 01:30:34.548651 | Active Connections: 43
2017-11-04 01:30:34.804526 | Active Connections: 44
2017-11-04 01:30:35.058731 | Active Connections: 47
2017-11-04 01:30:39.626745 | Active Connections: 48
2017-11-04 01:30:40.648594 | Active Connections: 49
Task Complete

Wait

Kill Celery
2017-11-04 01:31:57.766001 | Active Connections: 45
2017-11-04 01:31:58.786042 | Active Connections: 5
2017-11-04 01:31:59.291814 | Active Connections: 3

These connections never go away, near as I can tell, unless I close Celery and restart it. Running the task again increases the number of open connections, it never decreases until I close Celery. After 3 runs, the active connection count is up to 77.


If I add in the commented code in my task above, it seems to help, but the total connections still seem to high to me. Running multiple times now looks like this:

Started with Disconnect Code Uncommented
2017-11-04 01:37:44.773113 | Active Connections: 29
2017-11-04 01:37:54.689032 | Active Connections: 33
2017-11-04 01:37:59.789031 | Active Connections: 32
2017-11-04 01:38:01.057219 | Active Connections: 33
2017-11-04 01:38:02.330613 | Active Connections: 36
2017-11-04 01:38:06.139188 | Active Connections: 35
2017-11-04 01:38:07.917854 | Active Connections: 36
2017-11-04 01:38:13.016428 | Active Connections: 35
2017-11-04 01:39:11.848758 | Active Connections: 36
Second Run
2017-11-04 01:39:18.224475 | Active Connections: 38
2017-11-04 01:39:22.043765 | Active Connections: 37
2017-11-04 01:39:23.061727 | Active Connections: 38
2017-11-04 01:39:38.106320 | Active Connections: 37
Third Run
2017-11-04 01:40:49.623050 | Active Connections: 38
2017-11-04 01:40:54.480170 | Active Connections: 37
2017-11-04 01:40:55.501791 | Active Connections: 38
2017-11-04 01:41:00.330222 | Active Connections: 37
2017-11-04 01:41:03.643833 | Active Connections: 38
2017-11-04 01:41:08.735973 | Active Connections: 37
2017-11-04 01:41:10.257756 | Active Connections: 38
2017-11-04 01:41:15.348323 | Active Connections: 37
2017-11-04 01:41:17.137816 | Active Connections: 38
2017-11-04 01:41:22.241020 | Active Connections: 37

Ok, so all that said, my question is: why are my connections not closing, and how to I fix that? I'm going to need to run similar code but for 100+ parallel tasks, not just the 5 I'm using here for my example.

Upvotes: 4

Views: 5108

Answers (1)

Oleg Kuralenko
Oleg Kuralenko

Reputation: 11553

Here's the code that appears to be working. At least I can't reproduce the problem unlike the original one. Notice app.conf.broker_pool_limit = 0 and connection_pool.disconnect. Here's what broker_pool_limit does:

The maximum number of connections that can be open in the connection pool. If set to None or 0 the connection pool will be disabled and connections will be established and closed for every use.

import os
import time
import random
import datetime
import logging
import redis

logging.basicConfig()
logger = logging.getLogger(__name__)

from celery import Celery
from celery.contrib import rdb
app = Celery('tasks', backend='redis://localhost', broker='redis://localhost')

app.conf.broker_pool_limit = 0

def get_redis():
    url = os.environ.get("REDIS_URL")

    if url:
        r = redis.from_url(url)  # use secure for heroku
    else:
        r = redis.Redis()  # use unauthed connection locally

    return r

@app.task(bind=True, max_retries=10)
def test_delay_task(self, task_id):
    ''' Each task with try to grab a lock and once it does, will sleep 5 seconds, then
    print and exit.
    '''
    have_lock = False
    redis_cli = get_redis()
    lock = redis_cli.lock('mws_api')
    try:
        have_lock = lock.acquire(blocking=False)
        if have_lock:
            logger.warning("{} Lock Acquired".format(task_id))
            time.sleep(5)
            logger.warning('Test Task {} successful!'.format(task_id))
        else:
            logger.warning("{} Lock In Use, Retrying".format(task_id))
            self.request.retries = 1
            self.retry(countdown=5 * random.uniform(0.8, 1.2))
    finally:
        if have_lock:
            lock.release()
        redis_cli.connection_pool.disconnect()

        # We'll come back to this code, but it partially works
        # c = r.info()['connected_clients']
        # print("Disconnecting Redis | Connections: {}".format(c))
        # r.connection_pool.disconnect()


@app.task(bind=True, max_retries=10)
def test_parallel_tasks(self):
    ''' Runs 10 consecutive tasks, each which will try to grab a lock and run. '''
    for i in range(5):
        test_delay_task.delay(i)


def get_connected_clients():
    try:
        connections = 0
        while True:
            time.sleep(.25)
            c = get_redis().info()['connected_clients']
            # c = redis.Redis().info()['connected_clients']
            if c != connections:
                now = datetime.datetime.now()
                print("{} | Active Connections: {}".format(now, c))
                connections = c
            else:
                continue
    except KeyboardInterrupt:
        print("Shutting Down")

When this code is run every worker holds only a single connection after every worker has had a chance to handle a request + a bunch of connections held by the master celery process.

Connections math

For this script the master celery process needs 8 connections, ipython shell takes 4 connections after some tasks has been queried, and 1 connection per celery worker once a task has been processed by the worker. So the initial spike is caused by the celery master that needs so many connections. Without broker_pool_limit set it initially needs 10 connections

Upvotes: 2

Related Questions