Reputation: 51
I have an application that reads float data from sensors every 100 milliseconds, appends it to list and every 5 minutes calculates some statistics from that list and insert to MongoDB Database. Then it cleans the list and so on.
There are many of these list (as many as sensors) and I need to read the data periodic, so I set up Celery workers. It works pretty fine, but each Celery worker has its own specific global variables space, so lists during inserting to Database have different values, depends on which workers actually inserts data to Database.
What is the solution for sharing data between workers and lock it somehow to prevent multiple workers inserting to database its own version of sensors data?
I thought about Redis and appending sensors data direct to Redis dict and every 5 minutes reads the data from Redis, calculates the stats, cleans the Redis dict and so on.
import celery
import my_data_reader
import my_stats_calculator
import my_mongo_manager
app = celery.Celery('tasks', broker='redis://localhost')
sensor_data = []
data_reader = my_data_reader.TemperatureReader(1)
mongo_writer = my_mongo_manager.DataWriter()
stats_calculator = my_stats_calculator.Calculator()
# Runs every 100 milliseconds
@app.task
def update_sensors():
global sensor_data
global data_reader
sensor_data.append(data_reader.get_data())
# Runs every 5 seconds
@app.task
def insert_to_database():
global sensor_data
global mongo_writer
global stats_calculator
stats_dict = stats_calculator.calculate_stats(sensor_data)
mongo_writer.insert_data(stats_dict)
del sensor_data[:]
After running this code using 1 process (--concurrency=1 celery flag) it works absolutely fine, however in actual project there are over 25 sensors and I would like to somehow do these operations efficiently.
Does anybody know what is the proper way to share these objects between workers?
Upvotes: 2
Views: 4384
Reputation: 51
I figured out how to do this using Redis and some additional stuff. I present working code. If someone knows better solution, please post here.
First I wrote a decorator for Celery tasks that prevents multiple workers to manipulate Redis data at same time. I did some research and found a lightweight one from this site
However I see there are other options to achieve that using 3rd party modules like sherlock or celery_once.
import celery
import redis
import pymongo
from datetime import datetime as dt
app = celery.Celery('tasks', broker='redis://localhost')
redis_client = redis.Redis()
def only_one(function=None, key="", timeout=None):
"""Enforce only one celery task at a time."""
def _dec(run_func):
"""Decorator."""
def _caller(*args, **kwargs):
"""Caller."""
ret_value = None
have_lock = False
lock = redis_client.lock(key, timeout=timeout)
try:
have_lock = lock.acquire(blocking=False)
if have_lock:
ret_value = run_func(*args, **kwargs)
finally:
if have_lock:
lock.release()
return ret_value
return _caller
return _dec(function) if function is not None else _dec
Implement custom tasks - run methods are now decorated by our Redis lock
class SensorTask(app.Task):
"""A task."""
@only_one(key='SensorTask', timeout=60 * 5)
def run(self, **kwargs):
# Append some data to redis list
redis_client.lpush('Sensor1', 1.50)
class DatabaseTask(app.Task):
"""A task."""
# Database connection will stay the same in each process
# See https://docs.celeryproject.org/en/latest/userguide/tasks.html
_mongo_client = None
@property
def mongo_client(self):
if self._mongo_client is None:
self._mongo_client = pymongo.MongoClient()
return self._mongo_client
@only_one(key='DatabaseTask', timeout=60 * 5)
def run(self, **kwargs):
# Read current list of sensor values from Redis
current_sensor_values = redis_client.lrange('Sensor1', 0, -1)
# Convert Redis list to python float list
# map compares to list comprehension is a bit faster in my case
# values = [float(i) for i in current_sensor_values]
values = list(map(float, current_sensor_values))
# Example Mongo document to insert after 5 minutes of collecting data
mongo_document = {
'Timestamp': dt.now(),
'first': values[0],
'last': values[-1],
'max' : max(values),
'min' : min(values)
}
# Insert document to Mongo database and clean the Redis list
self.mongo_client['Sensors']['Sensor1'].insert_one(mongo_document)
redis_client.delete('Sensor1')
Last step is to register our tasks to Celery space:
update_sensor = app.register_task(SensorTask())
update_database = app.register_task(DatabaseTask())
Now it works pretty good with multiple workers. To run a task you need to call it using created alias - in our case update_sensor.delay() and update_database.delay()
Upvotes: 3