Reputation: 688
I'm working with Orcaflex (a FEM software for offshore analysis, but should not be relevant). I created a script to check if the simulations I've performed have been completed successfully (The simulation can fail for not reaching convergence). Since I'm talking about thousands of files I was trying to parallelize the process with multiprocessing
. Following, my code. Sorry but I can't produce a working example for you, but I'll try to explain in detail. I created a derived Class of multiprocessing.Process
and overwrite the run()
to perform the checks on the simulations files.
Then, in __main__
I set a number of processors, split the files accordingly, and start the execution.
The problem is that the processes are not spawning altogether but, in what appear to be, a random amount of time from one to another. Is this what it is supposed to be or am I missing something? What I mean by not spawning altogether is that I see:
[Info/Worker-1] child process calling self.run()
and for example:
[Info/Worker-4] child process calling self.run()
after about 10 min of the program running.
Thanks in advance for any help/suggetsion.
import os
import subprocess
import glob
import multiprocessing
import logging
import sys
import OrcFxAPI as of
class Worker(multiprocessing.Process):
myJobs = []
def setJobs(self, jobList):
self.myJobs = jobList
@staticmethod
def changedExtensionFileName(oldFileName, newExtension):
return '.'.join((os.path.splitext(oldFileName)[0], newExtension))
def run(self):
failed = []
model = of.Model(threadCount=1)
for job in self.myJobs:
try:
print('%s starting' % job)
sys.stdout.flush()
model.LoadSimulation(job)
if model.state == of.ModelState.SimulationStoppedUnstable:
newJob = job.replace('.sim', '.dat')
failed.append(newJob)
with open('Failed_Sim.txt', 'a') as f:
f.write(f'{newJob}\n')
f.close()
model.LoadData(newJob)
model.general.ImplicitConstantTimeStep /= 2
model.SaveData(newJob)
print(f'{job} has failed, reducing time step')
except of.DLLError as err:
print('%s ERROR: %s' % (job, err))
sys.stdout.flush()
with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
f.write('%s error: %s' % (job, err))
f.close()
return
if __name__ == '__main__':
import re
sim_file = [f for f in os.listdir() if re.search(r'\d\d\d\d.*.sim', f)]
# begin multprocessing
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
corecount = 14
workers = []
chunkSize = int(len(sim_file) / corecount)
chunkRemainder = int(len(sim_file) % corecount)
print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))
start = 0
for coreNum in range(0, corecount):
worker = Worker()
workers.append(worker)
end = start + chunkSize
if chunkRemainder>0:
chunkRemainder -= 1
end += 1
if end>len(sim_file):
end = len(sim_file)
worker.setJobs(sim_file[start:end])
worker.start()
start = end
if start>=len(sim_file):
break
for worker in workers:
worker.join()
print('Done...')
Upvotes: 1
Views: 247
Reputation: 1275
OK, so no one put up their hand to answer this by minor tweak (which I don't know how to do!), so here comes the larger rejig proposal...
def worker(inpData):
#The worker process
failed1 = []
failed2 = []
for job in inpData: #I'm not sure of the data shape of the chunks, has your original method split them into coherent chunks capable of being processed independently? My step here could be wrong.
try:
#print('%s starting' % job) #Prints won't appear on console from worker processes from windows, so commented them all out
model.LoadSimulation(job)
if model.state == of.ModelState.SimulationStoppedUnstable:
newJob = job.replace('.sim', '.dat')
failed1.append(newJob)
#I'd recommend we pass the list "failed" back to master and write to text from there, otherwise you could have several processes updating the text file at once, leading to possible loss of data
#with open('Failed_Sim.txt', 'a') as f:
# f.write(f'{newJob}\n')
# f.close()
model.LoadData(newJob)
model.general.ImplicitConstantTimeStep /= 2
model.SaveData(newJob)
#print(f'{job} has failed, reducing time step')
except of.DLLError as err:
#print('%s ERROR: %s' % (job, err))
#sys.stdout.flush()
#with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
# f.write('%s error: %s' % (job, err))
# f.close()
failed2.append(job)
#Note I've made two failed lists to pass back, for both failure types
return failed1, failed2
if __name__ == "__main__":
import re
import multiprocessing as mp
nCPUs = mp.cpu_count()
sim_file = [f for f in os.listdir() if re.search(r'\d\d\d\d.*.sim', f)]
#Make the chunks
chunkSize = int(len(sim_file) / corecount)
chunkRemainder = int(len(sim_file) % corecount)
print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))
chunks = []
start = 0
for iChunk in range(0, nCPUs)
end = start + chunkSize
if chunkRemainder>0:
chunkRemainder -= 1
end += 1
if end>len(sim_file):
end = len(sim_file)
chunk.append(sim_file[start:end])
#Send to workers
pool = mp.Pool(processes=nCPUs)
futA = []
for iChunk in range(0, nCPUs):
futA.append(pool.apply_async(worker, args=(chunk[iChunk],))
#Gather results
if futA:
failedDat = []
failedSim = []
for iChunk in range(0, len(futA)):
resA, resB = futA[iChunk].get()
failedDat.extend(resA)
failedSim.extend(resB)
pool.close()
if failedDat:
print("Following jobs failed, reducing timesteps:")
print(failedDat)
if failedSim:
print("Following sims failed due to errors")
print(failedSim)
Upvotes: 1