Benjamin
Benjamin

Reputation: 308

State of concurrent.futures is not reflecting its true state

I am using concurrent.futures.ProcessPoolExecutor to run several instances of code at once. While runnning I want to Monitor the instances. I use the future.running() and future.done() functions for that. I wrote a minimal example:

def dummy_solver(i):
    sleep(random()*5)
    return i

def foo():
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as pool:
        number_of_jobs = 6
        futures = [None] * number_of_jobs
        for job_number in range(len(futures)):
            futures[job_number] = pool.submit(dummy_solver, job_number)
        while True:
            msg = ""
            for future in futures:
                if future.running() is True:
                    part_msg = "Job Running       "
                if future.done() is True:
                    part_msg = "Job Done, Result:" + str(future.result())
                msg = msg + " | " + part_msg
            print("\r" + msg, end="")
            sleep(1)

This starts 6 jobs and puts them into the PoolExecutor which can work on 2 jobs at once. As soon as I start foo, this is what I get on the console:

 | Job Running        | Job Running        | Job Running        | Job Running        | Job Running        | Job Running       

This indicates that all processes are running at once. I think only two of them should run at once. What am I doing wrong?

Upvotes: 2

Views: 31

Answers (1)

Tim Peters
Tim Peters

Reputation: 70735

Here's a start: add

           part_msg = ""

(or whatever msg you like best) as the first statement in the

      for future in futures:

loop. As-is, part_msg is not reset on each loop iteration, so if neither future.running() nor future.done() is true part_msg just retains the value it had on the prior iteration.

There's also a subtlety here: .running() is really reporting whether the future has gotten far enough along in the internal machinery that its execution can no longer be cancelled. So there's no guarantee that at most max_workers futures will report that they're "running". That depends on internal implementation details that can vary across releases. Typically when a worker process starts a task, the machinery in the main program also enqueues another task waiting for a worker to become available, and at that point both tasks report running() despite that only one of them is actually executing.

Upvotes: 2

Related Questions