StefanS
StefanS

Reputation: 271

Problems in using multiprocessing

maybe you can help me in finding my error in setting up a multiprocessing function. I set up a worker function, that fetches data (type float) and compute an average. If I use the following code (with join()), no multiprocessing is started, each for loop is worked one after another. Correct values are computed.

The opposite is, when I remove the join()-function, the parallel processing is enabled, but errors occure and most of the computed data is the same. It seems that the worker processes do not use their own lists an variables. Can you please give me a hint? Thank you. Stefan

for hostgroup in hostgroups:
jobs = []
#multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process(target=worker, args=(hostgroup,hostsfile,mod_inputfile,outputdir,testmode,backup_dir,start_time,end_time,rrdname,unit,yesterday,now_epoch,rrd_interval,rrd_heartbeat,name))
jobs.append(p)
p.start()
p.join()

UPDATE, that is working with Pool, but still not parallel:

number_of_processes = len(hostgroups)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=number_of_processes)
for hostgroup in hostgroups:
    result = pool.apply_async(worker, [hostgroup,hostsfile,mod_inputfile,outputdir,testmode,backup_dir,start_time,end_time,rrdname,unit,yesterday,now_epoch,rrd_interval,rrd_heartbeat,name])
    print result.get(timeout=30)

UPDATE, this seems to work parallel, but only some processes end correctly (always different):

number_of_processes = len(hostgroups)
if __name__ == '__main__':
pool = multiprocessing.Pool()
results = []
for hostgroup in hostgroups:
    results.append(pool.apply_async(worker,[hostgroup,hostsfile,mod_inputfile,outputdir,testmode,backup_dir,start_time,end_time,rrdname,unit,yesterday,now_epoch,rrd_interval,rrd_heartbeat,name]))
pool.close()
pool.join()

Upvotes: 1

Views: 321

Answers (1)

Ryan Ye
Ryan Ye

Reputation: 3259

p.join() will block your main thread until the process finishes its work. To get real parallelism, you need kick off all the jobs before you call join().

Code Example

jobs = []
for hostgroup in hostgroups:
    p = multiprocessing.Process(target=worker, args=(hostgroup,hostsfile,mod_inputfile,outputdir,testmode,backup_dir,start_time,end_time,rrdname,unit,yesterday,now_epoch,rrd_interval,rrd_heartbeat,name))
    jobs.append(p)
    p.start()
[p.join() for p in jobs]

Upvotes: 2

Related Questions