user8852560
user8852560

Reputation:

Using ThreadPoolExecutor without Blocking

As a follow up on this question, I have a trivial script which starts a threadpoolexecutor to read in a json file. While doing that I want to it count from 1 to 9 using a for loop. For some reason even though I used executor.shutdown(wait=False) it still blocks and waits for the read_employees method to execute.

According to the documentation:

If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing

import concurrent.futures
import json
import time


def read_employees(read_file):
    with open(read_file) as f_obj:
        employees = json.load(f_obj)

    for emp in employees:
        print(emp)
        time.sleep(3)


def start_thread():
    filename = 'employee.json'
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(read_employees, filename)
        executor.shutdown(wait=False)

def print_number():
    for num in range(1,10):
        time.sleep(2)
        print(num)


start_thread()
print_number()

If I were to do this:

def read_employees(read_file):
    with open(read_file) as f_obj:
        employees = json.load(f_obj)

    for emp in employees:
        time.sleep(5)
        print(emp)


def print_number():
    for num in range(1,10):
        print(num)


filename = 'employee.json'
empThread = threading.Thread(target=read_employees, args=(filename,))
empThread.start()

print_number()

It counts from 1 to 9 first and then prints out the employees, the delay is because of sleep while reading the employees. Like so:

1
2
3
4
5
6
7
8
9
[email protected]
[email protected]

How do I achieve the same output using the threadpoolexecutor without blocking?

Upvotes: 8

Views: 8868

Answers (2)

Michael Hadam
Michael Hadam

Reputation: 320

I'd recommend that you don't use a with statement. A with statement closes by calling the __exit__ method of a context manager. A context manager is any class that implements an __enter__ and __exit__ method. So after everything is run within a with statement, it calls __exit__ on the context manager that was passed in.

In this case, ThreadPoolExecutor is a context manager. ThreadPoolExecutor is a subclass of Executor. So by referencing Executor's class definition, we see that in its __exit__ method it calls self.shutdown(wait=True).

That call to self.shutdown(wait=True) is the problem. If you follow how the context manager works, since self.shutdown(wait=False) is the last thing in your with statement, __exit__ is going to be called directly after. And that means that self.shutdown(wait=True) will be called. So that's what's blocking you.

You have two options to fix this. The first one is to subclass ThreadPoolExecutor and rewrite the __exit__ method.

The second option is to do something like this:

def start_thread():
    filename = 'employee.json'
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
    executor.submit(read_employees, filename)
    executor.shutdown(wait=False)

Upvotes: 22

Piotr Wilkin
Piotr Wilkin

Reputation: 3491

Probably due to this little snippet:

"You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True)"

https://docs.python.org/3/library/concurrent.futures.html

Upvotes: 0

Related Questions