Danilo Bassi
Danilo Bassi

Reputation: 348

is it safe to trow exception in python ThreadPool executor map?

I have created this example code to try to explain what my question is about:

from concurrent.futures import ThreadPoolExecutor

def my_method(x):
    if x == 2:
       print("error")
       raise Exception("Exception raised")
    else:
       print("ok")

with ThreadPoolExecutor() as executor:
    executor.map(my_method, [1, 2, 3])

I have a similar code in my app, but, instead of printing a message it just call an AWS Lambda function. The thing is, some lines before calling the lambda inside my_method function can throw an exception. If I don't need to read the result of the executor.map method, the code does not throw an exception and It does finish all jobs that it can. In this case the output is:

> ok
> error
> ok

I know that if i do something like this:

with ThreadPoolExecutor() as executor:
    for r in executor.map(my_method, [1, 2, 3]):
        print(r)

this lines will throw an error, and not all tasks will be executed. In my case, not all AWS lambda functions will be triggered. But if I use the execute method like the first example, my code will trigger all lambda as possible. Is it safe to do that?

I'm asking this because it could be some memory leak, or something that I may not know about.

What are the best practices in this cases?

Upvotes: 0

Views: 686

Answers (1)

Homer512
Homer512

Reputation: 13295

You will not get a memory leak, that is not something Python would do. However, I recommend against using Executor.map like that. The reason it works when you don't consume the map iterator is that results are buffered for their consumption. Since the iterator clearly stops once an exception is encountered, the intended behavior is to stop the execution. I would not count on any specific behavior w.r.t. open work items.

I see two possible ways forward:

  1. You catch the exception in the mapped function. It seems you want to ignore the error, so why not do that?

  2. You switch to Executor.submit which can handle exceptions much better.

futures = [executor.submit(my_method, arg) for arg in [1, 2, 3]]
# wait until done. Return exception or None
exceptions = [future.exception() for future in futures]
# keep only actual exceptions
exceptions = [exception for exception in exceptions if exception is not None]

Looking at the source code, executor.map is implemented like this:

fs = [self.submit(fn, *args) for args in zip(*iterables)]

# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.                                                                                                                                                                                      
def result_iterator():
    try:
        # reverse to keep finishing order
        fs.reverse()
        while fs:
            # Careful not to keep a reference to the popped future
            if timeout is None:
                yield _result_or_cancel(fs.pop())
            else:
                yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
    finally:
        for future in fs:
            future.cancel()
return result_iterator()

So, basically the same approach I did. And note how once an exception is encountered, all remaining futures are cancelled.

Upvotes: 1

Related Questions