Reputation: 591
I have the following code which I would like to convert from using ThreadPool to use of ProcessPoolExecutor since it is all CPU intensive calculations and when i observe the CPU monitor I note that my 8 core processor is only using a single thread.
import datetime
from multiprocessing.dummy import Pool as ThreadPool
def thread_run(q, clients_credit_array, clients_terr_array,
freq_small_list, freq_large_list, clients, year, admin):
claim_id = []
claim_client_id = []
claim_company_id = []
claim_year = []
claim_type = []
claim_closed = []
claim_cnt = []
claim_amount = []
print(datetime.datetime.utcnow())
i = 0
client_cnt = 1000
loop_incr = 8
while i < client_cnt:
ind_rng = range(i, min((i + loop_incr), (client_cnt)), 1)
call_var = []
for q in ind_rng:
call_var.append((q,
clients_credit_array,
clients_terr_array,
freq_small_list,
freq_large_list,
clients,
year,
admin))
pool = ThreadPool(len(call_var))
results = pool.map(call_claim, call_var)
pool.close()
pool.join()
for result in results:
if result[0] == []:
pass
else:
r = 0
if r < len(result[0]):
claim_index += 1
claim_id.append(claim_index)
claim_client_id.append(result[0][r])
claim_company_id.append(result[1][r])
claim_year.append(result[2][r])
claim_type.append(result[3][r])
claim_closed.append(result[4][r])
claim_cnt.append(result[5][r])
claim_amount.append(result[6][r])
r += 1
i += loop_incr
print(datetime.datetime.utcnow())
The difficulty I am having, however, is that when I modify the code as follows, I get error messages:
from concurrent.futures import ProcessPoolExecutor as PThreadPool
pool = PThreadPool(max_workers=len(call_var))
#pool = ThreadPool(len(call_var))
results = pool.map(call_claim, call_var)
#pool.close()
#pool.join()
I had to remove the pool.close() and pool.join() as it generated errors. But when I removed them, my code was not utilizing parallel processors and it ran much longer and slower than originally. What am I missing?
Upvotes: 0
Views: 153
Reputation: 2737
As was pointed out in the comments, it is common to see Executor
used as part of a context manager and without the need for join
or close
operations. Below is a simplified example to illustrate the concepts.
Example:
import concurrent.futures
import random
import time
import os
values = [1, 2, 3, 4, 5]
def times_two(n):
time.sleep(random.randrange(1, 5))
print("pid:", os.getpid())
return n * 2
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(times_two, values)
for one_result in results:
print(one_result)
if __name__ == "__main__":
main()
Output:
pid: 396
pid: 8904
pid: 25440
pid: 20592
pid: 14636
2
4
6
8
10
Upvotes: 1