Reputation: 349
I'm new to celery and an overall python noob. I must have stumbled upon the right solution during my research but I just don't seem to understand what I need to do for what seems to be a simple case scenario. I followed the following guide to learn about flask+celery.
There seems there is something obvious I'm missing about how to trigger a task after the first one is finished. I tried using callbacks, using loops, even tried using Celery Flower and Celery beat to realise this has nothing with what I'm doing...
After filling the form, I want to send an email with attachements (result of the task) or a failure email otherwise. Without having to wonder what my user is doing on the app (no HTTP requests)
class ClassWithTheTask:
def __init__(self, filename, proxies):
# do stuff until a variable results is created
self.results = 'this contains my result'
@app.route('/', methods=['GET', 'POST'])
@app.route('/index', methods=['GET', 'POST'])
def index():
form = MyForm()
if form.validate_on_submit():
# ...
# the task
my_task = task1.delay(file_path, proxies)
return redirect(url_for('taskstatus', task_id=my_task.id, filename=filename, email=form.email.data))
return render_template('index.html',
form=form)
@celery.task(bind=True)
def task1(self, filepath, proxies):
task = ClassWithTheTask(filepath, proxies)
return results
@celery.task
def send_async_email(msg):
"""Background task to send an email with Flask-Mail."""
with app.app_context():
mail.send(msg)
@app.route('/status/<task_id>/<filename>/<email>')
def taskstatus(task_id, filename, email):
task = task1.AsyncResult(task_id)
if task.state == 'PENDING':
# job did not start yet
response = {
'state': task.state,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'status': task.info.get('status', '')
}
if 'results' in task.info:
response['results'] = task.info['results']
response['untranslated'] = task.info['untranslated']
msg = Message('Task Complete for %s !' % filename,
recipients=[email])
msg.body = 'blabla'
with app.open_resource(response['results']) as fp:
msg.attach(response['results'], "text/csv", fp.read())
with app.open_resource(response['untranslated']) as fp:
msg.attach(response['untranslated'], "text/csv", fp.read())
# the big problem here is that it will send the email only if the user refreshes the page and get the 'SUCCESS' status.
send_async_email.delay(msg)
flash('task finished. sent an email.')
return redirect(url_for('index'))
else:
# something went wrong in the background job
response = {
'state': task.state,
'status': str(task.info), # this is the exception raised
}
return jsonify(response)
Upvotes: 1
Views: 2302
Reputation: 10111
I don't get the goal of your method for status check. Anyway what you are describing can be accomplished this way.
if form.validate_on_submit():
# ...
# the task
my_task = (
task1.s(file_path, proxies).set(link_error=send_error_email.s(filename, error))
| send_async_email.s()
).delay()
return redirect(url_for('taskstatus', task_id=my_task.id, filename=filename, email=form.email.data))
Then your error task will look like this. The normal task can stay the way it is.
@celery.task
def send_error_email(task_id, filename, email):
task = AsyncResult(task_id)
.....
What happens here is that you are using a chain. You are telling Celery to run your task1
, if that completes successfully then run send_async_email
, if it fails run send_error_email
. This should work, but you might need to adapt the parameters, consider it as pseudocode.
Upvotes: 2
Reputation: 151401
This does not seem right at all:
def task1(self, filepath, proxies):
task = ClassWithTheTask(filepath, proxies)
return results
The line my_task = task1.delay(file_path, proxies)
earlier in your code suggests you want to return task
but you return results
which is not defined anywhere. (ClassWithTheTask
is also undefined). This code would crash, and your task would never execute.
Upvotes: 0