Reputation: 1516
I am building simple waiting list app in Django 1.10.3 using Celery 4.1.0.
I have following base tasks:
@shared_task
def start_user_counter():
logging.info('Task executed @ {}'.format(datetime.datetime.utcnow()))
# This task is executed when user reaches the Top of the queue.
# Send email, perform other stuff in here ...
@shared_task
def update_queue():
curr_time = datetime.datetime.utcnow()
logging.info('Task called @ {}'.format(curr_time))
time_to_exec = curr_time + datetime.timedelta(seconds=10)
# Here, perform checks if task already exists in Redis
# if it does not exist - create a new one and store it to Redis
# if it does exist - update task's ETA.
task_id = start_user_counter.apply_async(eta=time_to_exec)
logging.info('Task ID: {}'.format(task_id))
# ...
update_queue.delay()
Each task represents one user on the waiting list. New user will be assigned ETA when he is suppose to be removed from the waiting list (he reached the top at the ETA). However, each user has also a possibility to speed up the time when he will reach the top of the waiting list.
Question: How I can update ETA of already existing Task so it execcutes earlier than it was first anticipated?
Upvotes: 6
Views: 6066
Reputation: 1516
I have managed to solve this problem. My solution was to create sorted set using Redis. For score
value associated with each user entry in that set I used timestamp
representing time when the user was added into the waiting list. This helped me to keep users in waiting list in the right orderer.
I also used Redis hash for storing celery.result.AsyncResult.id
that I received right after creating celery task with notify_user.apply_async((self.id,), eta=eta).id
(see more below).
Then whenever I needed to update tasks' ETA I had to make workers to ignore the task by calling AsyncResult.revoke()
like this AsyncResult(self.get_task_id()).revoke()
. AsyncResult(self.get_task_id())
would return query task state associated with the id
that I got from calling self.get_task_id()
. Calling .revoke()
on this AsyncResult
instance would make any worker receiving the task, or having reserved the task, to ignore it.
This would allow me to create completely new task with a new ETA, whose id
I would store back again in the same user record in Redis, thus overriding the old id
value.
My code example is specific for my case, but the bottom line is:
celery.result.AsyncResult.id
somewhere (i.e. self.task_id = T.apply_async((args,), eta=eta).id
).self.eta = eta
)AsyncResult(task_id)
and ignore this task calling .revoke()
method on it. (i.e. AsyncResult(self.task_id).revoke()
self.task_id = T.apply_async((args,), eta=new_eta).id
)#utils.py
import datetime as dt
import redis
from django.conf import settings
from celery.result import AsyncResult
from .tasks import notify_candidate
KEY_DATA = 'user:data'
KEY_QUEUE = 'user:queue'
TIME_DELTA = 'time_delta'
TASK_ID = 'task_id'
WAITING_TIME = 14 * 24 * 60 * 60 # 14 days by default
r = redis.StrictRedis(host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB)
class UserEntry(object):
def __init__(self, user_id):
self.id = user_id
# dynamically creates string for each user that will be later used
# as a key for hash in our Redis storage
self.user_key = '{}:{}'.format(KEY_DATA, user_id)
self.create_or_update()
def create_or_update(self, data=None):
"""
Set up new user entry.
:return: None
"""
if self.exist():
# data exist for user with user_id - update it
r.hmset(self.user_key, data)
else:
# this is a new user - create new entry for this user
self.add_user()
eta = dt.datetime.utcfromtimestamp(self.get_score())
task_id = notify_user.apply_async((self.id,), eta=eta).id
r.hmset(self.user_key, {TASK_ID: task_id})
def add_user(self):
"""
Appends user's ID to the end of the queue.
:return: None
"""
if self.get_index():
# if user entry exits simulate NX option of zadd command -
# Don't update already existing elements. Always add new elements.
return
# use UTC timestamp as score
utc_time = dt.datetime.utcnow()
score = int(utc_time.timestamp()) + WAITING_TIME
r.zadd(KEY_QUEUE, score, self.id)
def get_score(self):
"""
Gets user's score (current ETA).
:return: timestamp representing value of user's ETA
"""
return r.zscore(KEY_QUEUE, self.id)
def get_index(self):
"""
Gets user's position in the queue.
:return: 0-based index value representing user's position in the queue
"""
return r.zrank(KEY_QUEUE, self.id)
def get_task_id(self):
"""
Helper method to get task ID for the user
:return: value of user task's ID
"""
return r.hget(self.user_key, TASK_ID).decode('ascii')
def set_score(self, score_delta):
"""
Move user up in the queue by score value.
:param score_delta: number of seconds by which user's
score (curernt ETA) will be decremented
:return: timestamp representing user's new score (ETA)
"""
r.zincrby(KEY_QUEUE, self.id, score_delta)
def exist(self):
"""
Helper method used to define whether user exists in queue
:return: dict of the hash’s name/value pairs if data entry exist
"""
return r.hgetall(self.user_key)
def bump(self):
"""
Move user up in the queue
:return: None
"""
if not self.exist():
return
# remove current task associated with the user
AsyncResult(self.get_task_id()).revoke()
# we need to decrement ETA, thus *(-1)
# here I make time_delta equal to 1 day or 1 * 24 * 60 * 60 seconds
time_delta = WAITING_TIME / 14 * -1
self.set_score(time_delta)
new_eta = dt.datetime.utcfromtimestamp(time_delta)
task_id = notify_user.apply_async((self.id,), eta=new_eta).id
self.create_or_update({TASK_ID: task_id})
#tasks.py
import datetime
import logging
from celery import shared_task
@shared_task
def notify_user(user_id):
logging.info('Task executed @ {}'.format(datetime.datetime.utcnow()))
loging.info('UserID: {}'.format(user_id))
# This task is executed when user reaches the Top of the queue.
# Send email, perform other stuff in here ...
#models.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from .utils import UserEntry
@receiver(post_save, sender=MyUser)
def create_user_entry_in_waiting_list(sender, instance=None, created=False, **kwargs):
if created:
# create user entry in the waiting_list
user_id = instance.id
UserEntry(user_id)
Upvotes: 4