Cnly
Cnly

Reputation: 221

Python 3: How to properly add new Futures to a list while already waiting upon it?

I have a concurrent.futures.ThreadPoolExecutor and a list. And with the following code I add futures to the ThreadPoolExecutor:

for id in id_list:
    future = self._thread_pool.submit(self.myfunc, id)
    self._futures.append(future)

And then I wait upon the list:

concurrent.futures.wait(self._futures)

However, self.myfunc does some network I/O and thus there will be some network exceptions. When errors occur, self.myfunc submits a new self.myfunc with the same id to the same thread pool and add a new future to the same list, just as the above:

try:
    do_stuff(id)
except:
    future = self._thread_pool.submit(self.myfunc, id)
    self._futures.append(future)
    return None

Here comes the problem: I got an error on the line of concurrent.futures.wait(self._futures):

File "/usr/lib/python3.4/concurrent/futures/_base.py", line 277, in wait
    f._waiters.remove(waiter)
ValueError: list.remove(x): x not in list

How should I properly add new Futures to a list while already waiting upon it?

Upvotes: 3

Views: 2735

Answers (1)

Tim Peters
Tim Peters

Reputation: 70602

Looking at the implementation of wait(), it certainly doesn't expect that anything outside concurrent.futures will ever mutate the list passed to it. So I don't think you'll ever get that "to work". It's not just that it doesn't expect the list to mutate, it's also that significant processing is done on list entries, and the implementation has no way to know that you've added more entries.

Untested, I'd suggest trying this instead: skip all that, and just keep a running count of threads still active. A straightforward way is to use a Condition guarding a count.

Initialization:

self._count_cond = threading.Condition()
self._thread_count = 0

When my_func is entered (i.e., when a new thread starts):

with self._count_cond:
    self._thread_count += 1

When my_func is done (i.e., when a thread ends), for whatever reason (exceptional or not):

with self._count_cond:
    self._thread_count -= 1
    self._count_cond.notify() # wake up the waiting logic

And finally the main waiting logic:

with self._count_cond:
    while self._thread_count:
        self._count_cond.wait()

POSSIBLE RACE

It seems possible that the thread count could reach 0 while work for a new thread has been submitted, but before its my_func invocation starts running (and so before _thread_count is incremented to account for the new thread).

So the:

with self._count_cond:
    self._thread_count += 1

part should really be done instead right before each occurrence of

self._thread_pool.submit(self.myfunc, id)

Or write a new method to encapsulate that pattern; e.g., like so:

def start_new_thread(self, id):
    with self._count_cond:
        self._thread_count += 1
    self._thread_pool.submit(self.myfunc, id)

A DIFFERENT APPROACH

Offhand, I expect this could work too (but, again, haven't tested it): keep all your code the same except change how you're waiting:

while self._futures:
    self._futures.pop().result()

So this simply waits for one thread at a time, until none remain.

Note that .pop() and .append() on lists are atomic in CPython, so no need for your own lock. And because your my_func() code appends before the thread it's running in ends, the list won't become empty before all threads really are done.

AND YET ANOTHER APPROACH

Keep the original waiting code, but rework the rest not to create new threads in case of exception. Like rewrite my_func to return True if it quits due to an exception, return False otherwise, and start threads running a wrapper instead:

def my_func_wrapper(self, id):
    keep_going = True
    while keep_going:
        keep_going = self.my_func(id)

This may be especially attractive if you someday decide to use multiple processes instead of multiple threads (creating new processes can be a lot more expensive on some platforms).

AND A WAY USING cf.wait()

Another way is to change just the waiting code:

while self._futures:
    fs = self._futures[:]
    for f in fs:
        self._futures.remove(f)
    concurrent.futures.wait(fs)

Clear? This makes a copy of the list to pass to .wait(), and the copy is never mutated. New threads show up in the original list, and the whole process is repeated until no new threads show up.

Which of these ways makes most sense seems to me to depend mostly on pragmatics, but there's not enough info about all you're doing for me to make a guess about that.

Upvotes: 3

Related Questions