Điệp
Điệp

Reputation: 65

Timeout in concurrent.futures.ThreadPoolExecutor within context manager not work correctly

Can someone help me explain why timeout doesn't work correctly when I use timeout within context manager?

It work correctly without using context manager, it will raise TimeoutException after 5s but with context manager it doesn't raise exception after 5 s.

implementation without context manager

from concurrent import futures
MAX_WORKERS = 20

def download_one(c):
    import time
    time.sleep(100)


def download_many():
    executor = futures.ThreadPoolExecutor(MAX_WORKERS)
    res = executor.map(download_one, [1,2,3,4],timeout=5)
    print(list(res))

    return len(list(res))


download_many()

implementation with context manager

from concurrent import futures
MAX_WORKERS = 20

def download_one(c):
    import time
    time.sleep(100)


def download_many():
    with futures.ThreadPoolExecutor(MAX_WORKERS) as executor:
        res = executor.map(download_one, [1,2,3,4],timeout=5)
        print(list(res))

    return len(list(res))

download_many()

Upvotes: 1

Views: 2144

Answers (1)

wovano
wovano

Reputation: 5093

The timeout works correctly in both cases. A concurrent.futures._base.TimeoutError is raised 5 seconds (= the specified timeout value) after list(res) is called. However, when using a context manager (the with statement) the __exit__ method of the context manager is called. In this case it will wait until all threads are finished before leaving the context and (re-)raising the original error.

This can be demonstrated by catching and logging the exception at the right places:

import concurrent
import logging
import time
from concurrent import futures

MAX_WORKERS = 20


def download_one(c):
    logging.info('download_one(%s)' % str(c))
    time.sleep(10)  # Note: reduced to 10 seconds


def sub(executor):
    try:
        futures = executor.map(download_one, [1,2,3,4], timeout=5)
    except concurrent.futures._base.TimeoutError:
        logging.info('map timed out!')  # this is never logged
        raise

    try:
        results = list(futures)
    except concurrent.futures._base.TimeoutError:
        logging.info('list timed out!')  # here it happens!
        raise

    logging.info(results)
    logging.info('sub done')
    return len(result)


def download_many1():  # without context manager
    logging.info('download_many1')
    executor = futures.ThreadPoolExecutor(MAX_WORKERS)
    return sub(executor)


def download_many2():  # with context manager
    logging.info('download_many2')
    with futures.ThreadPoolExecutor(MAX_WORKERS) as executor:
        return sub(executor)


logging.basicConfig(level=logging.DEBUG, format='%(asctime)-15s   %(message)s')

logging.info('start 1')
try:
    download_many1()
except concurrent.futures._base.TimeoutError:
    logging.info('timeout!')
finally:
    logging.info('1 finished\n')

logging.info('start 2')
try:
    download_many2()
except concurrent.futures._base.TimeoutError:
    logging.info('timeout!')
finally:
    logging.info('2 finished\n')

This outputs:

2019-04-27 21:17:20,578   start 1
2019-04-27 21:17:20,578   download_many1
2019-04-27 21:17:20,578   download_one(1)
2019-04-27 21:17:20,578   download_one(2)
2019-04-27 21:17:20,578   download_one(3)
2019-04-27 21:17:20,578   download_one(4)
2019-04-27 21:17:25,593   list timed out!   # actual timeout after 5 seconds
2019-04-27 21:17:25,593   timeout!          # the timeout you see at the same time
2019-04-27 21:17:25,593   1 finished

2019-04-27 21:17:25,593   start 2
2019-04-27 21:17:25,593   download_many2
2019-04-27 21:17:25,593   download_one(1)
2019-04-27 21:17:25,593   download_one(2)
2019-04-27 21:17:25,593   download_one(3)
2019-04-27 21:17:25,593   download_one(4)
2019-04-27 21:17:30,610   list timed out!   # actual timeout after 5 seconds
2019-04-27 21:17:35,608   timeout!          # the timeout you see 5 seconds later!!
2019-04-27 21:17:35,608   2 finished

Upvotes: 2

Related Questions