Reputation: 1142
I'm trying to pause a celery task temporary based on user button click.
What I've done is:
When a user clicks a button; I release an AJAX request that updates my celery task state to "PAUSE"
Then; my tactic was to; when I initate a task into celery; it runs a for loop. Every for loop; I read my database 'state' and see if it's set to PAUSE: if it is set to pause; I want to sleep it for 60 seconds or sleep it until user hits resume button; same idea.
This is my code:
r = redis.StrictRedis(host='localhost', port=6379, db=0)
@celery.task(bind=True)
def runTask(self, arr)
for items in arr:
current_task_id = self.request.id
item = r.get('celery-task-meta-'+current_task_id)
load_as_json = json.loads(item)
if "PAUSE" in load_as_json['status']:
sleep(50)
@app.route('/start')
def start_task()
runTask.apply_async(args=[arr])
return 'task started running
Here is how my pause API endpoint looks like:
@app.route('/stop/<task_id>')
def updateTaskState():
task_id = request.cookie.get('task_id')
loadAsJson = json.loads(r.get('celery-task-meta-'+str(task_id)))
loadAsJson['status'] = 'PAUSE'
loadAsJson.update(loadAsJson)
dump_as_json = json.dumps(loadAsJson)
updated_state = r.set('celery-task-meta-'+last_key, dump_as_json)
return 'updated state';
From what I conceptually understand; is that the reason why I'm not seeing an updated state is because; the task is already executed and isnt able to retrieve updated values from database. FYI: The task update state is set to PAUSE immediately; I checked this by creating a seperate script that checks state within while loop; everytime I click the button that release AJAX request to update the state; my db gets updated and it reads "PAUSE" on the seperate script; however within the @celery.task decorator I can't seem to get the updated state.
Below is my seperate script I used to test; and it seems to be updatign state as expected; I just can't get the updated state within task decorator... weirdly.
r = redis.StrictRedis(host='localhost', port=6379, db=0)
last_key = r.keys()
while True:
response = r.get('celery-task-meta-b1534a87-e18b-4f0a-89e2-08348d833056')
loadAsJson = json.loads(response)
print loadAsJson['status']
Upvotes: 1
Views: 4968
Reputation: 430
Faced with the same question and no good answers I came up with solution you might like and it is not dependent on the message queue you are using (aka Redis or RabbitMQ). The key for me was that the update_state method in the celery.app.task.Task class takes task_id as an optional parameter. In my case I am running long running file copy and checksum tasks through multiple worker nodes and sometimes the user wants to pause one running task to reduce performance requirements on the storage to allow other tasks to finish first. I am also running a stateless Flask REST API to initiate the backend tasks and retrieve status of running tasks so I needed a way to have an API call come in to pause and resume the tasks.
Here is my test function which can receive a "message" to pause itself by monitoring it's own state:
celery.task(bind=True)
def long_test(self, i):
print('long test starting with delay of ' + str(i) + 'seconds on each loop')
print('task_id =' + str(self.request.id))
self.update_state(state='PROCESSING')
count = 0
while True:
task = celery.AsyncResult(self.request.id)
while task.state == 'PAUSING' or task.state == 'PAUSED':
if task.state == 'PAUSING':
self.update_state(state='PAUSED')
time.sleep(i)
if task.state == 'RESUME':
self.update_state(state='PROCESSING')
print('long test loop ' + str(count) + ' ' + str(task.state))
count += 1
time.sleep(i)
Then, in order to pause or resume I can do the following:
>>> from project.celeryworker.tasks import long_test
>>> from project import create_app, make_celery
>>> flaskapp = create_app()
>>> celery = make_celery(flaskapp)
>>> from celery.app.task import Task
>>> long_test.apply_async(kwargs={'i': 5})
<AsyncResult: bf19d50f-cf04-47f0-a069-6545fb253887>
>>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='PAUSING')
>>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
'PAUSED'
>>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='RESUME')
>>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
'PROCESSING'
>>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='PAUSING')
>>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
'PAUSED'
Upvotes: 3