fassn
fassn

Reputation: 349

Celery: check if a task is completed to send an email to

I'm new to celery and an overall python noob. I must have stumbled upon the right solution during my research but I just don't seem to understand what I need to do for what seems to be a simple case scenario. I followed the following guide to learn about flask+celery.

What I understand:

There seems there is something obvious I'm missing about how to trigger a task after the first one is finished. I tried using callbacks, using loops, even tried using Celery Flower and Celery beat to realise this has nothing with what I'm doing...

Goal:

After filling the form, I want to send an email with attachements (result of the task) or a failure email otherwise. Without having to wonder what my user is doing on the app (no HTTP requests)

My code:

class ClassWithTheTask:
    def __init__(self, filename, proxies):
        # do stuff until a variable results is created
        self.results = 'this contains my result'

@app.route('/', methods=['GET', 'POST'])
@app.route('/index', methods=['GET', 'POST'])
def index():
    form = MyForm()

    if form.validate_on_submit():
        # ...
        # the task
        my_task = task1.delay(file_path, proxies)
        return redirect(url_for('taskstatus', task_id=my_task.id, filename=filename, email=form.email.data))

    return render_template('index.html',
                           form=form)

@celery.task(bind=True)
def task1(self, filepath, proxies):
    task = ClassWithTheTask(filepath, proxies)
    return results

@celery.task
def send_async_email(msg):
    """Background task to send an email with Flask-Mail."""
    with app.app_context():
        mail.send(msg)

@app.route('/status/<task_id>/<filename>/<email>')
def taskstatus(task_id, filename, email):
    task = task1.AsyncResult(task_id)

    if task.state == 'PENDING':
        # job did not start yet
        response = {
            'state': task.state,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'status': task.info.get('status', '')
        }
        if 'results' in task.info:
            response['results'] = task.info['results']
            response['untranslated'] = task.info['untranslated']

        msg = Message('Task Complete for %s !' % filename,
                      recipients=[email])

        msg.body = 'blabla'
        with app.open_resource(response['results']) as fp:
            msg.attach(response['results'], "text/csv", fp.read())
        with app.open_resource(response['untranslated']) as fp:
            msg.attach(response['untranslated'], "text/csv", fp.read())

        # the big problem here is that it will send the email only if the user refreshes the page and get the 'SUCCESS' status.

        send_async_email.delay(msg)
        flash('task finished. sent an email.')
        return redirect(url_for('index'))
    else:
        # something went wrong in the background job
        response = {
            'state': task.state,
            'status': str(task.info),  # this is the exception raised
        }
    return jsonify(response)

Upvotes: 1

Views: 2302

Answers (2)

birkett
birkett

Reputation: 10111

I don't get the goal of your method for status check. Anyway what you are describing can be accomplished this way.

if form.validate_on_submit():
        # ...
        # the task
        my_task = (
                    task1.s(file_path, proxies).set(link_error=send_error_email.s(filename, error))
                    | send_async_email.s()
                  ).delay()
        return redirect(url_for('taskstatus', task_id=my_task.id, filename=filename, email=form.email.data))

Then your error task will look like this. The normal task can stay the way it is.

@celery.task
def send_error_email(task_id, filename, email):
    task = AsyncResult(task_id)
    .....

What happens here is that you are using a chain. You are telling Celery to run your task1, if that completes successfully then run send_async_email, if it fails run send_error_email. This should work, but you might need to adapt the parameters, consider it as pseudocode.

Upvotes: 2

Louis
Louis

Reputation: 151401

This does not seem right at all:

def task1(self, filepath, proxies):
    task = ClassWithTheTask(filepath, proxies)
    return results

The line my_task = task1.delay(file_path, proxies) earlier in your code suggests you want to return task but you return results which is not defined anywhere. (ClassWithTheTask is also undefined). This code would crash, and your task would never execute.

Upvotes: 0

Related Questions