Reputation: 221
I have a concurrent.futures.ThreadPoolExecutor
and a list. And with the following code I add futures to the ThreadPoolExecutor:
for id in id_list:
future = self._thread_pool.submit(self.myfunc, id)
self._futures.append(future)
And then I wait upon the list:
concurrent.futures.wait(self._futures)
However, self.myfunc
does some network I/O and thus there will be some network exceptions. When errors occur, self.myfunc
submits a new self.myfunc
with the same id
to the same thread pool and add a new future to the same list, just as the above:
try:
do_stuff(id)
except:
future = self._thread_pool.submit(self.myfunc, id)
self._futures.append(future)
return None
Here comes the problem: I got an error on the line of concurrent.futures.wait(self._futures)
:
File "/usr/lib/python3.4/concurrent/futures/_base.py", line 277, in wait
f._waiters.remove(waiter)
ValueError: list.remove(x): x not in list
How should I properly add new Futures to a list while already waiting upon it?
Upvotes: 3
Views: 2735
Reputation: 70602
Looking at the implementation of wait()
, it certainly doesn't expect that anything outside concurrent.futures
will ever mutate the list passed to it. So I don't think you'll ever get that "to work". It's not just that it doesn't expect the list to mutate, it's also that significant processing is done on list entries, and the implementation has no way to know that you've added more entries.
Untested, I'd suggest trying this instead: skip all that, and just keep a running count of threads still active. A straightforward way is to use a Condition
guarding a count.
Initialization:
self._count_cond = threading.Condition()
self._thread_count = 0
When my_func
is entered (i.e., when a new thread starts):
with self._count_cond:
self._thread_count += 1
When my_func
is done (i.e., when a thread ends), for whatever reason (exceptional or not):
with self._count_cond:
self._thread_count -= 1
self._count_cond.notify() # wake up the waiting logic
And finally the main waiting logic:
with self._count_cond:
while self._thread_count:
self._count_cond.wait()
It seems possible that the thread count could reach 0 while work for a new thread has been submitted, but before its my_func
invocation starts running (and so before _thread_count
is incremented to account for the new thread).
So the:
with self._count_cond:
self._thread_count += 1
part should really be done instead right before each occurrence of
self._thread_pool.submit(self.myfunc, id)
Or write a new method to encapsulate that pattern; e.g., like so:
def start_new_thread(self, id):
with self._count_cond:
self._thread_count += 1
self._thread_pool.submit(self.myfunc, id)
Offhand, I expect this could work too (but, again, haven't tested it): keep all your code the same except change how you're waiting:
while self._futures:
self._futures.pop().result()
So this simply waits for one thread at a time, until none remain.
Note that .pop()
and .append()
on lists are atomic in CPython, so no need for your own lock. And because your my_func()
code appends before the thread it's running in ends, the list won't become empty before all threads really are done.
Keep the original waiting code, but rework the rest not to create new threads in case of exception. Like rewrite my_func
to return True
if it quits due to an exception, return False
otherwise, and start threads running a wrapper instead:
def my_func_wrapper(self, id):
keep_going = True
while keep_going:
keep_going = self.my_func(id)
This may be especially attractive if you someday decide to use multiple processes instead of multiple threads (creating new processes can be a lot more expensive on some platforms).
Another way is to change just the waiting code:
while self._futures:
fs = self._futures[:]
for f in fs:
self._futures.remove(f)
concurrent.futures.wait(fs)
Clear? This makes a copy of the list to pass to .wait()
, and the copy is never mutated. New threads show up in the original list, and the whole process is repeated until no new threads show up.
Which of these ways makes most sense seems to me to depend mostly on pragmatics, but there's not enough info about all you're doing for me to make a guess about that.
Upvotes: 3