Hunnam
Hunnam

Reputation: 117

Python multiprocessing - starting next process after several seconds

I am a beginner in Python, so I would very appreciate it if you can help me with clear and easy explanations.

In my Python script, I have a function that makes several threads to do an I/O bound task (What it really does is making several Azure requests concurrently using Azure Python SDK), and I also have a list of time differences like [1 second, 3 seconds, 10 seconds, 5 seconds, ..., 7 seconds] so that I execute the function again after each time difference.

Let's say I want to execute the function and execute it again after 5 seconds. The first execution can take much more than 5 seconds to finish as it has to wait for the requests it makes to be done. So, I want to execute each function in a different process so that different executions of the function do not block each other (Even if they don't block each other without using different processes, I just didn't want threads in different executions to be mixed).

My code is like:

import multiprocessing as mp
from time import sleep

def function(num_threads):
    """
    This functions makes num_threads number of threads to make num_threads number of requests 
    """

# Time to wait in seconds between each execution of the function
times = [1, 10, 7, 3, 13, 19]

# List of number of requests to make for each execution of the function
num_threads_list = [1, 2, 3, 4, 5, 6]

processes = []

for i in range(len(times)):
    p = mp.Process(target=function, args=[num_threads_list[i]])
    p.start()
    processes.append(p)

    sleep(times[i])

for process in processes:
    process.join()

Question I have due to mare:

  1. the length of the list "times" is very big in my real script (, which is 1000). Considering the time differences in the list "times", I guess there are at most 5 executions of the function running concurrently using processes. I wonder if each process terminates when it is done executing the function, so that there are actually at most 5 processes running. Or, Does it remain so that there will be 1000 processes, which sounds very weird given the number of CPU cores of my computer?

  2. Please tell me if you think there is a better way to do what I explained above.

Thank you!

Upvotes: 0

Views: 2942

Answers (2)

Laurent B.
Laurent B.

Reputation: 2263

You would also use a timer to do the job like in the following code. I voluntarily put 15 second to thread 2 in order that one could see it’s effectively ended in last position once time elapsed.

This code sample has two main functions. The first one your_process_here() like it’s name says is waiting for your own code The second one is a manager which organizes the threads slicing in order to not overload the system.

Parameters

max_process : total number of processes being executed by the script

simultp : maximum number of simultaneous processes

timegl : time guideline which defines the waiting time for each thread since time parent starts. So waiting time is at least the time defined in the guideline (which refers to parent's start time).

Say in other words, since its guideline time is elapsed, thread starts as soon as possible when taking into account the maximum number of simultaneous threads allowed.

In this example

max_process = 6

simultp = 3

timegl = [1, 15, 1, 0.22, 6, 0.5] (just for explanations because the more logical is to have an increase series there)

Result in the shell

simultaneously launched processes : 3

process n°2 is active and will wait 14.99 seconds more before treatment function starts

process n°1 is active and will wait 0.98 seconds more before treatment function starts

process n°3 is active and will wait 0.98 seconds more before treatment function starts

---- process n°1 ended ----

---- process n°3 ended ----

simultaneously launched processes : 3

process n°5 is active and will wait 2.88 seconds more before treatment function starts

process n°4 is active and will start now

---- process n°4 ended ----

---- process n°5 ended ----

simultaneously launched processes : 2

process n°6 is active and will start now

---- process n°6 ended ----

---- process n°2 ended ----

Code

import multiprocessing as mp
from threading import Timer
import time


def your_process_here(starttime, pnum, timegl):
    # Delay since the parent thread starts
    delay_since_pstart = time.time() - starttime
    # Time to sleep in order to follow the most possible the time guideline 
    diff = timegl[pnum-1]- delay_since_pstart
    if diff > 0: # if time ellapsed since Parent starts < guideline time
        print('process n°{0} is active and will wait {1} seconds more before treatment function starts'\
              .format(pnum, round(diff, 2)))        
        time.sleep(diff) # wait for X more seconds
    else:
        print('process n°{0} is active and will start now'.format(pnum))
    ########################################################
    ## PUT THE CODE AFTER SLEEP() TO START CODE WITH A DELAY
    ## if pnum == 1:
    ##     function1()  
    ## elif pnum == 2:
    ##     function2()
    ## ...
    print('---- process n°{0} ended ----'.format(pnum))

def process_manager(max_process, simultp, timegl, starttime=0, pnum=1, launchp=[]):
    # While your number of simultaneous current processes is less than simultp and
    # the historical number of processes is less than max_process
    while len(mp.active_children()) < simultp and len(launchp) < max_process:
        # Incrementation of the process number
        pnum = len(launchp) + 1
        # Start a new process
        mp.Process(target=your_process_here, args=(starttime, pnum, timegl)).start()
        # Historical of all launched unique processes
        launchp = list(set(launchp + mp.active_children()))        
    # ...
    ####### THESE 2 FOLLOWING LINES ARE TO DELETE IN OPERATIONAL CODE ############
    print('simultaneously launched processes : ', len(mp.active_children()))        
    time.sleep(3) # optionnal : This a break of 3 seconds before the next slice of process to be treated
    ##############################################################################
    if pnum < max_process:
        delay_repeat = 0.1 # 100 ms
        # If all the processes have not been launched renew the operation
        Timer(delay_repeat, process_manager, (max_process, simultp, timegl, starttime, pnum, launchp)).start()        

if __name__ == '__main__':
    max_process = 6 # maximum of processes
    simultp = 3 # maximum of simultaneous processes to save resources
    timegl = [1, 15, 1, 0.22, 6, 0.5] # Time guideline
    starttime = time.time()
    process_manager(max_process, simultp, timegl, starttime)

Upvotes: 0

Mace
Mace

Reputation: 1410

The main problem I destilate from your question is having a large amount of processes running simultaniously.

You can prevent that by maintaining a list of processes with a maximum length. Something like this.

import multiprocessing as mp
from time import sleep
from random import randint


def function(num_threads):
    """
    This functions makes num_threads number of threads to make num_threads number of requests
    """
    sleep(randint(3, 7))


# Time to wait in seconds between each execution of the function
times = [1, 10, 7, 3, 13, 19]

# List of number of requests to make for each execution of the function
num_threads_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

process_data_list = []
max_processes = 4


# =======================================================================================
def main():
    times_index = 0
    while times_index < len(times):

        # cleanup stopped processes -------------------------------
        cleanup_done = False
        while not cleanup_done:
            cleanup_done = True
            # search stopped processes
            for i, process_data in enumerate(process_data_list):
                if not process_data[1].is_alive():
                    print(f'process {process_data[0]} finished')
                    # remove from processes
                    p = process_data_list.pop(i)
                    del p
                    # start new search
                    cleanup_done = False
                    break

        # try start new process ---------------------------------
        if len(process_data_list) < max_processes:
            process = mp.Process(target=function, args=[num_threads_list[times_index]])
            process.start()
            process_data_list.append([times_index, process])
            print(f'process {times_index} started')
            times_index += 1
        else:
            sleep(0.1)

    # wait for all processes to finish --------------------------------
    while process_data_list:
        for i, process_data in enumerate(process_data_list):
            if not process_data[1].is_alive():
                print(f'process {process_data[0]} finished')
                # remove from processes
                p = process_data_list.pop(i)
                del p
                # start new search
                break

    print('ALL DONE !!!!!!')


# =======================================================================================
if __name__ == '__main__':
    main()

It runs max_processes at once as you can see in the result.

process 0 started
process 1 started
process 2 started
process 3 started
process 3 finished
process 4 started
process 1 finished
process 5 started
process 0 finished
process 2 finished
process 5 finished
process 4 finished
ALL DONE !!!!!!

Upvotes: 1

Related Questions