Reputation: 3
I used multiprocessing pool in Python3 to connect database(Postgresql).I created database connections with multiprocessing.Pool(8,init_dbconn),and do some dml with pool.map(update, records).Obviously I can get better performance if I do commit after pool.map rather than each iteration in pool.map().So I define another function to call dbconnection.commit,and run pool.map(func_commit, range(8), 1) after pool.map(update),but I found the pool don't distribute tasks to each child process at least one time, even I increase the size or range() to do more iteration. So some of database connection didn't commit the dml. Is there any method to achieve this function?
The pseudo-code like this:
def conn_db():
try:
conn = psycopg2.connect(database="dbname", user="username", password="pass", host="127.0.0.1", port="5432")
return (conn)
except:
print(traceback.format_exc())
def init_proc():
global conn
conn = conn_db()
global cursor
cursor = conn.cursor()
def double2(i):
print ("I'm process:%s, %s" % (os.getpid(), multiprocessing.current_process()))
return i*2
def exec_ten_parallel(num_parallel, records):
try:
pool = Pool(8, initializer=init_proc)
t1_b = time.time()
pool.imap(double2, range(16), 1)
#for i in range(16):
# pool.apply_async(double2,(1,))
pool.close()
pool.join()
t1_runtime = time.time() - t1_b
#print('update all count:', len(records), 'seconds:', t1_runtime)
except:
print(traceback.format_exc())
I print the worker name and pid, as shown below
I'm process:53141, <ForkProcess(ForkPoolWorker-1, started daemon)>
I'm process:53141, <ForkProcess(ForkPoolWorker-1, started daemon)>
I'm process:53142, <ForkProcess(ForkPoolWorker-2, started daemon)>
I'm process:53141, <ForkProcess(ForkPoolWorker-1, started daemon)>
I'm process:53141, <ForkProcess(ForkPoolWorker-1, started daemon)>
I'm process:53142, <ForkProcess(ForkPoolWorker-2, started daemon)>
I'm process:53141, <ForkProcess(ForkPoolWorker-1, started daemon)>
I'm process:53141, <ForkProcess(ForkPoolWorker-1, started daemon)>
[root@dc3-06-005 dml-test]# python3 bak-p.py
I'm process:53164, <ForkProcess(ForkPoolWorker-3, started daemon)>
I'm process:53164, <ForkProcess(ForkPoolWorker-3, started daemon)>
I'm process:53164, <ForkProcess(ForkPoolWorker-3, started daemon)>
I'm process:53164, <ForkProcess(ForkPoolWorker-3, started daemon)>
I'm process:53164, <ForkProcess(ForkPoolWorker-3, started daemon)>
I'm process:53164, <ForkProcess(ForkPoolWorker-3, started daemon)>
I'm process:53164, <ForkProcess(ForkPoolWorker-3, started daemon)>
I'm process:53164, <ForkProcess(ForkPoolWorker-3, started daemon)>
Almost could not run all ForkPoolWorker, just only call one or two worker do job.
Upvotes: 0
Views: 1021
Reputation: 17975
how about using multiprocessing.Barrier, you pass it in the initializer with the number of processes in the pool, then inside the function just call the wait()
method of it, just make sure the function has chunksize=1
and is called exactly the number of child processes, orelse everything will hang.
to avoid the hanging, you can pass a timeout, but you will also have to catch the error that will be raised if the timeout runs out ... so its not the safest approach, but totally doable.
from multiprocessing import Pool
import time
import traceback
import os
import multiprocessing
def double2(i):
global gbarrier
print ("I'm process:%s, %s" % (os.getpid(), multiprocessing.current_process()))
gbarrier.wait()
return i*2
def init_func(barrier):
global gbarrier
gbarrier = barrier
def exec_ten_parallel():
try:
barrier = multiprocessing.Barrier(8)
pool = Pool(8,initializer=init_func,initargs=(barrier,))
t1_b = time.time()
pool.imap(double2, range(8), chunksize=1)
#for i in range(16):
# pool.apply_async(double2,(1,))
pool.close()
pool.join()
t1_runtime = time.time() - t1_b
#print('update all count:', len(records), 'seconds:', t1_runtime)
except:
print(traceback.format_exc())
if __name__ == "__main__":
exec_ten_parallel()
I'm process:6396, <SpawnProcess name='SpawnPoolWorker-2' parent=12688 started daemon>
I'm process:13612, <SpawnProcess name='SpawnPoolWorker-4' parent=12688 started daemon>
I'm process:14848, <SpawnProcess name='SpawnPoolWorker-8' parent=12688 started daemon>
I'm process:6836, <SpawnProcess name='SpawnPoolWorker-1' parent=12688 started daemon>
I'm process:12632, <SpawnProcess name='SpawnPoolWorker-6' parent=12688 started daemon>
I'm process:1536, <SpawnProcess name='SpawnPoolWorker-3' parent=12688 started daemon>
I'm process:11856, <SpawnProcess name='SpawnPoolWorker-5' parent=12688 started daemon>
I'm process:13888, <SpawnProcess name='SpawnPoolWorker-7' parent=12688 started daemon>
if you want a safer approach, then perhaps for the normal runtime have a counter on every process that will commit every 100 operations or something, and only call this dangerous function when you absolutely have to, in short it should work out of the box, but inside the production environment you must handle the "one time it will fail because a cosmic ray crashed a process before it reached the barrier" sort of error, otherwise you must make your own multiprocess.Pool
using multiple multiprocess.Process
Edit: moved the wait to the end of function instead of its start, for faster out-of-order execution, also imap is fine if you don't want to block the main process.
Edit2: in order to make the barrier safe to use, you have to join the threads before attempting this command, and you have to pass a timeout, and wrap it in a try block, and catch the timeout error in each process, then each process will return a success flag, and the main process will then detect the command failure and join the pool again before redoing this command again, this is how it should be done in a production environment to avoid failure, its failure should happen once every few months/years, but this is the way to solve it.
Upvotes: 2