Giovanni Frison
Giovanni Frison

Reputation: 688

Multiprocessing not spawning all the requested processes

I'm working with Orcaflex (a FEM software for offshore analysis, but should not be relevant). I created a script to check if the simulations I've performed have been completed successfully (The simulation can fail for not reaching convergence). Since I'm talking about thousands of files I was trying to parallelize the process with multiprocessing. Following, my code. Sorry but I can't produce a working example for you, but I'll try to explain in detail. I created a derived Class of multiprocessing.Process and overwrite the run() to perform the checks on the simulations files. Then, in __main__ I set a number of processors, split the files accordingly, and start the execution.

The problem is that the processes are not spawning altogether but, in what appear to be, a random amount of time from one to another. Is this what it is supposed to be or am I missing something? What I mean by not spawning altogether is that I see:

[Info/Worker-1] child process calling self.run()

and for example:

[Info/Worker-4] child process calling self.run()

after about 10 min of the program running.

Thanks in advance for any help/suggetsion.

import os
import subprocess
import glob
import multiprocessing
import logging
import sys
import OrcFxAPI as of

class Worker(multiprocessing.Process):

    myJobs = []

    def setJobs(self, jobList):
        self.myJobs = jobList

    @staticmethod
    def changedExtensionFileName(oldFileName, newExtension):
        return '.'.join((os.path.splitext(oldFileName)[0], newExtension))

    def run(self):
        failed = []
        model = of.Model(threadCount=1)

        for job in self.myJobs:
            try:
                print('%s starting' % job)
                sys.stdout.flush()
                model.LoadSimulation(job)
                if model.state == of.ModelState.SimulationStoppedUnstable:
                    newJob = job.replace('.sim', '.dat')
                    failed.append(newJob)

                    with open('Failed_Sim.txt', 'a') as f:
                        f.write(f'{newJob}\n')
                        f.close()

                    model.LoadData(newJob)
                    model.general.ImplicitConstantTimeStep /= 2
                    model.SaveData(newJob)
                    print(f'{job} has failed, reducing time step')

            except of.DLLError as err:
                print('%s ERROR: %s' % (job, err))
                sys.stdout.flush()
                with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
                    f.write('%s error: %s' % (job, err))
                    f.close()
        return



if __name__ == '__main__':
    import re
    sim_file = [f for f in os.listdir() if re.search(r'\d\d\d\d.*.sim', f)]    

    # begin multprocessing
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    corecount = 14 

    workers = []

    chunkSize = int(len(sim_file) / corecount)
    chunkRemainder = int(len(sim_file) % corecount)
    print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))

    start = 0
    for coreNum in range(0, corecount):
        worker = Worker()
        workers.append(worker)
        end = start + chunkSize
        if chunkRemainder>0:
            chunkRemainder -= 1
            end += 1
        if end>len(sim_file):
            end = len(sim_file)
        worker.setJobs(sim_file[start:end])
        worker.start()
        start = end
        if start>=len(sim_file):
            break

    for worker in workers:
        worker.join()
    print('Done...')

Upvotes: 1

Views: 247

Answers (1)

Amiga500
Amiga500

Reputation: 1275

OK, so no one put up their hand to answer this by minor tweak (which I don't know how to do!), so here comes the larger rejig proposal...

def worker(inpData):
    #The worker process

    failed1 = []
    failed2 = []

    for job in inpData:   #I'm not sure of the data shape of the chunks, has your original method split them into coherent chunks capable of being processed independently? My step here could be wrong. 
        try:
            #print('%s starting' % job)  #Prints won't appear on console from worker processes from windows, so commented them all out
            model.LoadSimulation(job)
            if model.state == of.ModelState.SimulationStoppedUnstable:
                newJob = job.replace('.sim', '.dat')
                failed1.append(newJob)

                #I'd recommend we pass the list "failed" back to master and write to text from there, otherwise you could have several processes updating the text file at once, leading to possible loss of data
                #with open('Failed_Sim.txt', 'a') as f:
                #     f.write(f'{newJob}\n')
                #     f.close()

                model.LoadData(newJob)
                model.general.ImplicitConstantTimeStep /= 2
                model.SaveData(newJob)
                #print(f'{job} has failed, reducing time step')   

            except of.DLLError as err:
                #print('%s ERROR: %s' % (job, err))
                #sys.stdout.flush()
                #with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
                #    f.write('%s error: %s' % (job, err))
                #    f.close()
                failed2.append(job)

#Note I've made two failed lists to pass back, for both failure types

return failed1, failed2


if __name__ == "__main__":
    import re
    import multiprocessing as mp
    nCPUs = mp.cpu_count()

    sim_file = [f for f in os.listdir() if re.search(r'\d\d\d\d.*.sim', f)] 

    #Make the chunks
    chunkSize = int(len(sim_file) / corecount)
    chunkRemainder = int(len(sim_file) % corecount)
    print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))

    chunks = []
    start = 0
    for iChunk in range(0, nCPUs)
        end = start + chunkSize
        if chunkRemainder>0:
            chunkRemainder -= 1
            end += 1
            if end>len(sim_file):
                end = len(sim_file)
        chunk.append(sim_file[start:end])


    #Send to workers
    pool = mp.Pool(processes=nCPUs)
    futA = []

    for iChunk in range(0, nCPUs):
        futA.append(pool.apply_async(worker, args=(chunk[iChunk],))
    

    #Gather results
    if futA:
        failedDat = []
        failedSim = []
        for iChunk in range(0, len(futA)):
            resA, resB = futA[iChunk].get()
            failedDat.extend(resA)
            failedSim.extend(resB)
    pool.close()
            
    if failedDat:
        print("Following jobs failed, reducing timesteps:")
        print(failedDat)
    if failedSim:
        print("Following sims failed due to errors")
        print(failedSim) 

Upvotes: 1

Related Questions