John Kolosky
John Kolosky

Reputation: 51

What is the best solution for sharing data between processes(workers) in Celery - Python?

I have an application that reads float data from sensors every 100 milliseconds, appends it to list and every 5 minutes calculates some statistics from that list and insert to MongoDB Database. Then it cleans the list and so on.

There are many of these list (as many as sensors) and I need to read the data periodic, so I set up Celery workers. It works pretty fine, but each Celery worker has its own specific global variables space, so lists during inserting to Database have different values, depends on which workers actually inserts data to Database.

What is the solution for sharing data between workers and lock it somehow to prevent multiple workers inserting to database its own version of sensors data?

I thought about Redis and appending sensors data direct to Redis dict and every 5 minutes reads the data from Redis, calculates the stats, cleans the Redis dict and so on.

import celery
import my_data_reader
import my_stats_calculator
import my_mongo_manager

app = celery.Celery('tasks', broker='redis://localhost')

sensor_data = []

data_reader = my_data_reader.TemperatureReader(1)
mongo_writer = my_mongo_manager.DataWriter()
stats_calculator = my_stats_calculator.Calculator()


# Runs every 100 milliseconds
@app.task
def update_sensors():

    global sensor_data
    global data_reader

    sensor_data.append(data_reader.get_data())

# Runs every 5 seconds
@app.task
def insert_to_database():

    global sensor_data
    global mongo_writer
    global stats_calculator

    stats_dict = stats_calculator.calculate_stats(sensor_data)
    mongo_writer.insert_data(stats_dict)
    del sensor_data[:]

After running this code using 1 process (--concurrency=1 celery flag) it works absolutely fine, however in actual project there are over 25 sensors and I would like to somehow do these operations efficiently.

Does anybody know what is the proper way to share these objects between workers?

Upvotes: 2

Views: 4384

Answers (1)

John Kolosky
John Kolosky

Reputation: 51

I figured out how to do this using Redis and some additional stuff. I present working code. If someone knows better solution, please post here.

First I wrote a decorator for Celery tasks that prevents multiple workers to manipulate Redis data at same time. I did some research and found a lightweight one from this site

However I see there are other options to achieve that using 3rd party modules like sherlock or celery_once.

import celery
import redis
import pymongo
from datetime import datetime as dt

app = celery.Celery('tasks', broker='redis://localhost')
redis_client = redis.Redis()

def only_one(function=None, key="", timeout=None):
    """Enforce only one celery task at a time."""

    def _dec(run_func):
        """Decorator."""

        def _caller(*args, **kwargs):
            """Caller."""
            ret_value = None
            have_lock = False
            lock = redis_client.lock(key, timeout=timeout)
            try:
                have_lock = lock.acquire(blocking=False)
                if have_lock:
                    ret_value = run_func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()

            return ret_value

        return _caller

    return _dec(function) if function is not None else _dec

Implement custom tasks - run methods are now decorated by our Redis lock

class SensorTask(app.Task):
    """A task."""

    @only_one(key='SensorTask', timeout=60 * 5)
    def run(self, **kwargs):
        # Append some data to redis list
        redis_client.lpush('Sensor1', 1.50)


class DatabaseTask(app.Task):
    """A task."""

    # Database connection will stay the same in each process
    # See https://docs.celeryproject.org/en/latest/userguide/tasks.html
    _mongo_client = None

    @property
    def mongo_client(self):
        if self._mongo_client is None:
            self._mongo_client = pymongo.MongoClient()
        return self._mongo_client

    @only_one(key='DatabaseTask', timeout=60 * 5)
    def run(self, **kwargs):

        # Read current list of sensor values from Redis
        current_sensor_values = redis_client.lrange('Sensor1', 0, -1)

        # Convert Redis list to python float list
        # map compares to list comprehension is a bit faster in my case
        # values = [float(i) for i in current_sensor_values]
        values = list(map(float, current_sensor_values))

        # Example Mongo document to insert after 5 minutes of collecting data
        mongo_document = {
                'Timestamp': dt.now(),
                'first': values[0],
                'last': values[-1],
                'max' : max(values),
                'min' : min(values)
                }

        # Insert document to Mongo database and clean the Redis list
        self.mongo_client['Sensors']['Sensor1'].insert_one(mongo_document)
        redis_client.delete('Sensor1')

Last step is to register our tasks to Celery space:

update_sensor = app.register_task(SensorTask())
update_database = app.register_task(DatabaseTask()) 

Now it works pretty good with multiple workers. To run a task you need to call it using created alias - in our case update_sensor.delay() and update_database.delay()

Upvotes: 3

Related Questions