Rufat
Rufat

Reputation: 682

ThreadPoolExecutor works, but ProcessPoolExecutor gives exception

Environment: Python 3.6.8 on CentOS 8 and Windows 7. I am trying to speed up my code with multiprocessing. And I'm trying to figure out when to use ThreadPoolExecutor and when to use ProcessPoolExecutor, and what are the differences between them.

The following example code works fine:

# source code here: https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example
from concurrent import futures
import urllib.request

URLS = ["http://www.foxnews.com/",
        "http://www.cnn.com/",
        "http://europe.wsj.com/",
        "http://www.bbc.co.uk/",
        "http://some-made-up-domain.com/"]


def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

def main1():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = dict((executor.submit(load_url, url, 60), url) for url in URLS)

        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print("%r page is %d bytes" % (url, len(future.result())))
            except Exception as e:
                print("%r generated an exception: %s" % (url, e))

if __name__ == "__main__":
    main1()

Output 1:

'http://some-made-up-domain.com/' page is 64668 bytes
'http://europe.wsj.com/' generated an exception: HTTP Error 403: Forbidden
'http://www.cnn.com/' page is 1146005 bytes
'http://www.bbc.co.uk/' page is 308991 bytes
'http://www.foxnews.com/' page is 328413 bytes

But when I replace ThreadPoolExecutor with ProcessPoolExecutor the following example code fails to run:

from concurrent import futures
import urllib.request

URLS = ["http://www.foxnews.com/",
        "http://www.cnn.com/",
        "http://europe.wsj.com/",
        "http://www.bbc.co.uk/",
        "http://some-made-up-domain.com/"]


def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

def main2():
    with futures.ProcessPoolExecutor(max_workers=5) as executor:
        future_to_url = dict((executor.submit(load_url, url, 60), url) for url in URLS)

        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print("%r page is %d bytes" % (url, len(future.result())))
            except Exception as e:
                print("%r generated an exception: %s" % (url, e))

if __name__ == "__main__":
    main2()

Output 2:

'http://some-made-up-domain.com/' page is 64668 bytes
Process Process-4:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 175, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "process_pool_executor_so_2.py", line 12, in load_url
    return urllib.request.urlopen(url, timeout=timeout).read()
  File "/usr/lib64/python3.6/urllib/request.py", line 223, in urlopen
    return opener.open(url, data, timeout)
  File "/usr/lib64/python3.6/urllib/request.py", line 532, in open
    response = meth(req, response)
  File "/usr/lib64/python3.6/urllib/request.py", line 642, in http_response
    'http', request, response, code, msg, hdrs)
  File "/usr/lib64/python3.6/urllib/request.py", line 564, in error
    result = self._call_chain(*args)
  File "/usr/lib64/python3.6/urllib/request.py", line 504, in _call_chain
    result = func(*args)
  File "/usr/lib64/python3.6/urllib/request.py", line 756, in http_error_302
    return self.parent.open(new, timeout=req.timeout)
  File "/usr/lib64/python3.6/urllib/request.py", line 532, in open
    response = meth(req, response)
  File "/usr/lib64/python3.6/urllib/request.py", line 642, in http_response
    'http', request, response, code, msg, hdrs)
  File "/usr/lib64/python3.6/urllib/request.py", line 564, in error
    result = self._call_chain(*args)
  File "/usr/lib64/python3.6/urllib/request.py", line 504, in _call_chain
    result = func(*args)
  File "/usr/lib64/python3.6/urllib/request.py", line 756, in http_error_302
    return self.parent.open(new, timeout=req.timeout)
  File "/usr/lib64/python3.6/urllib/request.py", line 532, in open
    response = meth(req, response)
  File "/usr/lib64/python3.6/urllib/request.py", line 642, in http_response
    'http', request, response, code, msg, hdrs)
  File "/usr/lib64/python3.6/urllib/request.py", line 570, in error
    return self._call_chain(*args)
  File "/usr/lib64/python3.6/urllib/request.py", line 504, in _call_chain
    result = func(*args)
  File "/usr/lib64/python3.6/urllib/request.py", line 650, in http_error_default
    raise HTTPError(req.full_url, code, msg, hdrs, fp)
urllib.error.HTTPError: HTTP Error 403: Forbidden

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib64/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 178, in _process_worker
    result_queue.put(_ResultItem(call_item.work_id, exception=exc))
  File "/usr/lib64/python3.6/multiprocessing/queues.py", line 341, in put
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib64/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.BufferedReader' object
'http://www.foxnews.com/' generated an exception: A process in the process pool was terminated abruptly while the future was running or pending.
'http://www.cnn.com/' generated an exception: A process in the process pool was terminated abruptly while the future was running or pending.
'http://europe.wsj.com/' generated an exception: A process in the process pool was terminated abruptly while the future was running or pending.
'http://www.bbc.co.uk/' generated an exception: A process in the process pool was terminated abruptly while the future was running or pending.

Why does ThreadPoolExecutor handle exceptions well and if one thread fails, the other threads return good values, but when I use ProcessPoolExecutor, if one process failed, all other processes terminated and gave an error The process in the process pool was abruptly terminated while the future was running or waiting?

How to fix terminating processes if one of them crashed?

Upvotes: 0

Views: 2717

Answers (1)

Szabolcs
Szabolcs

Reputation: 4086

Seems like in 3.6.x an unhandled exception in one of the workers of the process pool cause all other workers to terminate with an exception as well.

One workaround I found for now is to catch all the possible exceptions in the worker code:

def load_url(url, timeout):
    try:
        return urllib.request.urlopen(url, timeout=timeout).read()
    except Exception as e:
        raise Exception(str(e))

This way other processes won't be affected. On the otherhand I couldn't reproduce it with version 3.8.x.

Upvotes: 1

Related Questions