C. Cooney
C. Cooney

Reputation: 591

Converting from ThreadPool to ProcessExecutorPool

I have the following code which I would like to convert from using ThreadPool to use of ProcessPoolExecutor since it is all CPU intensive calculations and when i observe the CPU monitor I note that my 8 core processor is only using a single thread.

import datetime
from multiprocessing.dummy import Pool as ThreadPool


def thread_run(q, clients_credit_array, clients_terr_array,
               freq_small_list, freq_large_list, clients, year, admin):
    claim_id = []
    claim_client_id = []
    claim_company_id = []
    claim_year = []
    claim_type = []
    claim_closed = []
    claim_cnt = []
    claim_amount = []

    print(datetime.datetime.utcnow())
    i = 0
    client_cnt = 1000
    loop_incr = 8
    while i < client_cnt:
        ind_rng = range(i, min((i + loop_incr), (client_cnt)), 1)
        call_var = []
        for q in ind_rng:
            call_var.append((q,
                             clients_credit_array,
                             clients_terr_array,
                             freq_small_list,
                             freq_large_list,
                             clients,
                             year,
                             admin))

        pool = ThreadPool(len(call_var))
        results = pool.map(call_claim, call_var)
        pool.close()
        pool.join()

        for result in results:
            if result[0] == []:
                pass
            else:
                r = 0
                if r < len(result[0]):
                    claim_index += 1
                    claim_id.append(claim_index)
                    claim_client_id.append(result[0][r])
                    claim_company_id.append(result[1][r])
                    claim_year.append(result[2][r])
                    claim_type.append(result[3][r])
                    claim_closed.append(result[4][r])
                    claim_cnt.append(result[5][r])
                    claim_amount.append(result[6][r])
                    r += 1
        i += loop_incr
    print(datetime.datetime.utcnow())

The difficulty I am having, however, is that when I modify the code as follows, I get error messages:

from concurrent.futures import ProcessPoolExecutor as PThreadPool
        pool = PThreadPool(max_workers=len(call_var))
        #pool = ThreadPool(len(call_var))
        results = pool.map(call_claim, call_var)
        #pool.close()
        #pool.join()

I had to remove the pool.close() and pool.join() as it generated errors. But when I removed them, my code was not utilizing parallel processors and it ran much longer and slower than originally. What am I missing?

Upvotes: 0

Views: 153

Answers (1)

rhurwitz
rhurwitz

Reputation: 2737

As was pointed out in the comments, it is common to see Executor used as part of a context manager and without the need for join or close operations. Below is a simplified example to illustrate the concepts.

Example:

import concurrent.futures
import random
import time
import os

values = [1, 2, 3, 4, 5]


def times_two(n):
    time.sleep(random.randrange(1, 5))
    print("pid:", os.getpid())
    return n * 2


def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = executor.map(times_two, values)

        for one_result in results:
            print(one_result)


if __name__ == "__main__":
    main()

Output:

pid: 396
pid: 8904
pid: 25440
pid: 20592
pid: 14636
2
4
6
8
10

Upvotes: 1

Related Questions