Richiedlon
Richiedlon

Reputation: 111

Celery task queuing

I have created a flask application and it consist of 2 celery tasks.

Task 1: Generate a file through a process

Task 2: Email the generated file

Normally task one needs more time compared to task 2. I want to execute task 1 and then task 2. But the problem is both start to execute at the same time inside celery.

How can I resolve this issue.

@celery.task(name='celery_example.process')
def process(a,b,c,d,e,f):
    command='rnx2rtkp -p '+a+' -f '+b+' -m '+c+' -n -o oout.pos '+d+' '+e+' '+f
    os.system(command)
    return 'Successfully created POS file'

@celery.task(name='celery_example.emailfile')
def emailfile(recipientemail):  
    email_user = ''
    email_password = ''
    subject = 'subject'
    msg = MIMEMultipart()
    msg['From'] = email_user
    msg['To'] = recipientemail
    msg['Subject'] = subject
    body = 'This is your Post-Processed position file'
    msg.attach(MIMEText(body,'plain'))
    filename='oout.pos'
    attachment  =open(filename,'rb')
    part = MIMEBase('application','octet-stream')
    part.set_payload((attachment).read())
    encoders.encode_base64(part)
    part.add_header('Content-Disposition',"attachment; filename= "+filename)
    msg.attach(part)
    text = msg.as_string()
    server = smtplib.SMTP('smtp.gmail.com',587)
    server.starttls()
    server.login(email_user,email_password)
    server.sendmail(email_user,recipientemail,text)
    server.quit()
    return 'Email has been successfully sent'   

This is the app.route

@app.route('/pp.php', methods=['GET', 'POST'])
def pp():
    pp = My1Form()
    target = os.path.join(APP_ROOT)
    print(target)

    for fileBase in request.files.getlist("fileBase"):
        print(fileBase)
        filename = fileBase.filename
        destination = "/".join([target, filename])
        print(destination)
        fileBase.save(destination)

    for fileObsRover in request.files.getlist("fileObsRover"):
        print(fileObsRover)
        filename = fileObsRover.filename
        destination = "/".join([target, filename])
        print(destination)
        fileObsRover.save(destination)

    for fileNavRover in request.files.getlist("fileNavRover"):
        print(fileNavRover)
        filename = fileNavRover.filename
        destination = "/".join([target, filename])
        print(destination)
        fileNavRover.save(destination)

        a=fileObsRover.filename
        b=fileBase.filename
        c=fileNavRover.filename
        elevation=pp.ema.data
        Freq=pp.frq.data
        posMode=pp.pmode.data
        emailAdd=pp.email.data

        process.delay(posMode,Freq,elevation,a,b,c)
        emailfile.delay(emailAdd)

        return render_template('results.html', email=pp.email.data, Name=pp.Name.data, ema=elevation, frq=Freq, pmode=posMode, fileBase=a)
    return render_template('pp.php', pp=pp)

Upvotes: 1

Views: 84

Answers (1)

donkopotamus
donkopotamus

Reputation: 23176

As it currently stands your code does the following:

# schedule process to run asynchronously
process.delay(posMode,Freq,elevation,a,b,c)

# schedule emailfile to run asynchronously
emailfile.delay(emailAdd)

Both of these will immediately be picked up by workers and executed. You have provided nothing to inform celery that emailfile should wait until processfile is complete.

Instead you should:

  • alter the signature of emailfile to include another parameter that will be the output of a successful processfile call; then
  • call processfile using link.

For example:

deferred = processfile.apply_async(
    (posMode,Freq,elevation,a,b,c), 
    link=emailfile.s()) 
deferred.get()

An alternative to using link, but semantically identical in this case, would be to use a chain.

Upvotes: 1

Related Questions