Lhassan Baazzi
Lhassan Baazzi

Reputation: 1150

Twisted execute 10 threads in same time and wait for result

Writing a program that verify list of emails syntax and MX records, as blocking programming is time consuming, I want do this async or by threads, this my code:

with open(file_path) as f:
    # check the status of file, if away then file pointer will be at the last index
    if (importState.status == ImportStateFile.STATUS_AWAY):
        f.seek(importState.fileIndex, 0)

    while True:
        # the number of emails to process is configurable 10 or 20
        emails = list(islice(f, app.config['NUMBER_EMAILS_TO_PROCESS']))
        if len(emails) == 0:
            break;

        importState.fileIndex = importState.fileIndex + len(''.join(emails))

        for email in emails:
            email = email.strip('''<>;,'\r\n ''').lower()
            d = threads.deferToThread(check_email, email)
            d.addCallback(save_email_status, email, importState)

        # set the number of emails processed 
        yield set_nbrs_emails_process(importState)

        # do an insert of all emails
        yield reactor.callFromThread(db.session.commit)

# set file status as success
yield finalize_import_file_state(importState)
reactor.callFromThread(reactor.stop)

Check email function:

def check_email(email):
    pipe = subprocess.Popen(["./check_email", '--email=%s' % email], stdout=subprocess.PIPE)
    status = pipe.stdout.read()
    try:
        status = int(status)
    except ValueError:
        status = -1

    return status

what I need is to process 10 emails in same time and wait for result.

Upvotes: 1

Views: 346

Answers (1)

Jean-Paul Calderone
Jean-Paul Calderone

Reputation: 48315

I'm not sure why there are threads involved in your example code. You don't need threads to interact with email with Twisted, nor to do so concurrently.

If you have an asynchronous function that returns a Deferred, you can just call it ten times and the ten different streams of work will proceed in parallel:

for i in range(10):
    async_check_email_returning_deferred()

If you want to know when all ten results are available, you can use gatherResults:

from twisted.internet.defer import gatherResults
...
email_results = []
for i in range(10):
    email_results.append(async_check_mail_returning_deferred())
all_results = gatherResults(email_results)

all_results is a Deferred that will fire when all of the Deferreds in email_results have fired (or when the first of them fires with a Failure).

Upvotes: 2

Related Questions