kuzzooroo
kuzzooroo

Reputation: 7408

Light persistence in the context of ThreadPoolExecutor in Python

I've got some Python code that farms out expensive jobs using ThreadPoolExecutor, and I'd like to keep track of which of them have completed so that if I have to restart this system, I don't have to redo the stuff that already finished. In a single-threaded context, I could just mark what I've done in a shelf. Here's a naive port of that idea to a multithreaded environment:

from concurrent.futures import ThreadPoolExecutor
import subprocess
import shelve


def do_thing(done, x):
    # Don't let the command run in the background; we want to be able to tell when it's done
    _ = subprocess.check_output(["some_expensive_command", x])
    done[x] = True


futs = []
with shelve.open("done") as done:
    with ThreadPoolExecutor(max_workers=18) as executor:
        for x in things_to_do:
            if done.get(x, False):
                continue
            futs.append(executor.submit(do_thing, done, x))
            # Can't run `done[x] = True` here--have to wait until do_thing finishes
        for future in futs:
            future.result()

    # Don't want to wait until here to mark stuff done, as the whole system might be killed at some point
    # before we get through all of things_to_do

Can I get away with this? The documentation for shelve doesn't contain any guarantees about thread safety, so I'm thinking no.

So what is the simple way to handle this? I thought that perhaps sticking done[x] = True in future.add_done_callback would do it, but that will often run in the same thread as the future itself. Perhaps there is a locking mechanism that plays nicely with ThreadPoolExecutor? That seems cleaner to me that writing a loop that sleeps and then checks for completed futures.

Upvotes: 1

Views: 650

Answers (1)

Julien
Julien

Reputation: 5729

While you're still in the outer-most with context manager, the done shelve is just a normal python object- it is only written to disk when the context manager closes and it runs its __exit__ method. It is therefore just as thread safe as any other python object, due to the GIL (as long as you're using CPython).

Specifically, the reassignment done[x] = True is thread safe / will be done atomically.

It's important to note that while the shelve's __exit__ method will run after a Ctrl-C, it won't if the python process ends abruptly, and the shelve won't be saved to disk.

To protect against this kind of failure, I would suggest using a lightweight file-based thread safe database like sqllite3.

Upvotes: 1

Related Questions