katamatsu
katamatsu

Reputation: 235

Python multiprocessing - Is it possible to introduce a fixed time delay between individual processes?

I have searched and cannot find an answer to this question elsewhere. Hopefully I haven't missed something.

I am trying to use Python multiprocessing to essentially batch run some proprietary models in parallel. I have, say, 200 simulations, and I want to batch run them ~10-20 at a time. My problem is that the proprietary software crashes if two models happen to start at the same / similar time. I need to introduce a delay between processes spawned by multiprocessing so that each new model run waits a little bit before starting.

So far, my solution has been to introduced a random time delay at the start of the child process before it fires off the model run. However, this only reduces the probability of any two runs starting at the same time, and therefore I still run into problems when trying to process a large number of models. I therefore think that the time delay needs to be built into the multiprocessing part of the code but I haven't been able to find any documentation or examples of this.

Edit: I am using Python 2.7

This is my code so far:

from time import sleep
import numpy as np
import subprocess
import multiprocessing

def runmodels(arg):
    sleep(np.random.rand(1,1)*120) # this is my interim solution to reduce the probability that any two runs start at the same time, but it isn't a guaranteed solution
    subprocess.call(arg) # this line actually fires off the model run

if __name__ == '__main__':    

    arguments =     [big list of runs in here
                    ]    

    count = 12
    pool = multiprocessing.Pool(processes = count)
    r = pool.imap_unordered(runmodels, arguments)      
    pool.close()
    pool.join()

Upvotes: 9

Views: 7861

Answers (3)

Jitse
Jitse

Reputation: 101

The answer suggested by jfs caused problems for me as a result of starting a new thread with threading.Timer. If the worker just so happens to finish before the timer does, the timer is killed and the lock is never released.

I propose an alternative route, in which each successive worker will wait until enough time has passed since the start of the previous one. This seems to have the same desired effect, but without having to rely on another child process.

import multiprocessing as mp
import time

def init(shared_val):
    global start_time
    start_time = shared_val

def run_model(arg):
    with start_time.get_lock():
        wait_time = max(0, start_time.value - time.time())
        time.sleep(wait_time)
        start_time.value = time.time() + 1.0 # Specify interval here
    # ... start your simulation here

if __name__=="__main__":
   arguments = ...
   pool = mp.Pool(processes=12, 
                  initializer=init, initargs=[mp.Value('d')])
   for _ in pool.imap_unordered(run_model, arguments):
       pass

Upvotes: 0

Quentin Rouland
Quentin Rouland

Reputation: 134

One way to do this with thread and semaphore :

from time import sleep
import subprocess
import threading


def runmodels(arg):
    subprocess.call(arg)
    sGlobal.release() # release for next launch


if __name__ == '__main__':
    threads = []
    global sGlobal
    sGlobal = threading.Semaphore(12) #Semaphore for max 12 Thread
    arguments =  [big list of runs in here
                ]
    for arg in arguments :
        sGlobal.acquire() # Block if more than 12 thread
        t = threading.Thread(target=runmodels, args=(arg,))
        threads.append(t)
        t.start()
        sleep(1)

    for t in threads :
        t.join()

Upvotes: 1

jfs
jfs

Reputation: 414179

multiprocessing.Pool() already limits number of processes running concurrently.

You could use a lock, to separate the starting time of the processes (not tested):

import threading
import multiprocessing

def init(lock):
    global starting
    starting = lock

def run_model(arg):
    starting.acquire() # no other process can get it until it is released
    threading.Timer(1, starting.release).start() # release in a second
    # ... start your simulation here

if __name__=="__main__":
   arguments = ...
   pool = Pool(processes=12, 
               initializer=init, initargs=[multiprocessing.Lock()])
   for _ in pool.imap_unordered(run_model, arguments):
       pass

Upvotes: 7

Related Questions