Reputation: 117
I am a beginner in Python, so I would very appreciate it if you can help me with clear and easy explanations.
In my Python script, I have a function that makes several threads to do an I/O bound task (What it really does is making several Azure requests concurrently using Azure Python SDK), and I also have a list of time differences like [1 second, 3 seconds, 10 seconds, 5 seconds, ..., 7 seconds] so that I execute the function again after each time difference.
Let's say I want to execute the function and execute it again after 5 seconds. The first execution can take much more than 5 seconds to finish as it has to wait for the requests it makes to be done. So, I want to execute each function in a different process so that different executions of the function do not block each other (Even if they don't block each other without using different processes, I just didn't want threads in different executions to be mixed).
My code is like:
import multiprocessing as mp
from time import sleep
def function(num_threads):
"""
This functions makes num_threads number of threads to make num_threads number of requests
"""
# Time to wait in seconds between each execution of the function
times = [1, 10, 7, 3, 13, 19]
# List of number of requests to make for each execution of the function
num_threads_list = [1, 2, 3, 4, 5, 6]
processes = []
for i in range(len(times)):
p = mp.Process(target=function, args=[num_threads_list[i]])
p.start()
processes.append(p)
sleep(times[i])
for process in processes:
process.join()
Question I have due to mare:
the length of the list "times" is very big in my real script (, which is 1000). Considering the time differences in the list "times", I guess there are at most 5 executions of the function running concurrently using processes. I wonder if each process terminates when it is done executing the function, so that there are actually at most 5 processes running. Or, Does it remain so that there will be 1000 processes, which sounds very weird given the number of CPU cores of my computer?
Please tell me if you think there is a better way to do what I explained above.
Thank you!
Upvotes: 0
Views: 2942
Reputation: 2263
You would also use a timer to do the job like in the following code. I voluntarily put 15 second to thread 2 in order that one could see it’s effectively ended in last position once time elapsed.
This code sample has two main functions. The first one your_process_here() like it’s name says is waiting for your own code The second one is a manager which organizes the threads slicing in order to not overload the system.
Parameters
max_process : total number of processes being executed by the script
simultp : maximum number of simultaneous processes
timegl : time guideline which defines the waiting time for each thread since time parent starts. So waiting time is at least the time defined in the guideline (which refers to parent's start time).
Say in other words, since its guideline time is elapsed, thread starts as soon as possible when taking into account the maximum number of simultaneous threads allowed.
In this example
max_process = 6
simultp = 3
timegl = [1, 15, 1, 0.22, 6, 0.5] (just for explanations because the more logical is to have an increase series there)
Result in the shell
simultaneously launched processes : 3
process n°2 is active and will wait 14.99 seconds more before treatment function starts
process n°1 is active and will wait 0.98 seconds more before treatment function starts
process n°3 is active and will wait 0.98 seconds more before treatment function starts
---- process n°1 ended ----
---- process n°3 ended ----
simultaneously launched processes : 3
process n°5 is active and will wait 2.88 seconds more before treatment function starts
process n°4 is active and will start now
---- process n°4 ended ----
---- process n°5 ended ----
simultaneously launched processes : 2
process n°6 is active and will start now
---- process n°6 ended ----
---- process n°2 ended ----
Code
import multiprocessing as mp
from threading import Timer
import time
def your_process_here(starttime, pnum, timegl):
# Delay since the parent thread starts
delay_since_pstart = time.time() - starttime
# Time to sleep in order to follow the most possible the time guideline
diff = timegl[pnum-1]- delay_since_pstart
if diff > 0: # if time ellapsed since Parent starts < guideline time
print('process n°{0} is active and will wait {1} seconds more before treatment function starts'\
.format(pnum, round(diff, 2)))
time.sleep(diff) # wait for X more seconds
else:
print('process n°{0} is active and will start now'.format(pnum))
########################################################
## PUT THE CODE AFTER SLEEP() TO START CODE WITH A DELAY
## if pnum == 1:
## function1()
## elif pnum == 2:
## function2()
## ...
print('---- process n°{0} ended ----'.format(pnum))
def process_manager(max_process, simultp, timegl, starttime=0, pnum=1, launchp=[]):
# While your number of simultaneous current processes is less than simultp and
# the historical number of processes is less than max_process
while len(mp.active_children()) < simultp and len(launchp) < max_process:
# Incrementation of the process number
pnum = len(launchp) + 1
# Start a new process
mp.Process(target=your_process_here, args=(starttime, pnum, timegl)).start()
# Historical of all launched unique processes
launchp = list(set(launchp + mp.active_children()))
# ...
####### THESE 2 FOLLOWING LINES ARE TO DELETE IN OPERATIONAL CODE ############
print('simultaneously launched processes : ', len(mp.active_children()))
time.sleep(3) # optionnal : This a break of 3 seconds before the next slice of process to be treated
##############################################################################
if pnum < max_process:
delay_repeat = 0.1 # 100 ms
# If all the processes have not been launched renew the operation
Timer(delay_repeat, process_manager, (max_process, simultp, timegl, starttime, pnum, launchp)).start()
if __name__ == '__main__':
max_process = 6 # maximum of processes
simultp = 3 # maximum of simultaneous processes to save resources
timegl = [1, 15, 1, 0.22, 6, 0.5] # Time guideline
starttime = time.time()
process_manager(max_process, simultp, timegl, starttime)
Upvotes: 0
Reputation: 1410
The main problem I destilate from your question is having a large amount of processes running simultaniously.
You can prevent that by maintaining a list of processes with a maximum length. Something like this.
import multiprocessing as mp
from time import sleep
from random import randint
def function(num_threads):
"""
This functions makes num_threads number of threads to make num_threads number of requests
"""
sleep(randint(3, 7))
# Time to wait in seconds between each execution of the function
times = [1, 10, 7, 3, 13, 19]
# List of number of requests to make for each execution of the function
num_threads_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
process_data_list = []
max_processes = 4
# =======================================================================================
def main():
times_index = 0
while times_index < len(times):
# cleanup stopped processes -------------------------------
cleanup_done = False
while not cleanup_done:
cleanup_done = True
# search stopped processes
for i, process_data in enumerate(process_data_list):
if not process_data[1].is_alive():
print(f'process {process_data[0]} finished')
# remove from processes
p = process_data_list.pop(i)
del p
# start new search
cleanup_done = False
break
# try start new process ---------------------------------
if len(process_data_list) < max_processes:
process = mp.Process(target=function, args=[num_threads_list[times_index]])
process.start()
process_data_list.append([times_index, process])
print(f'process {times_index} started')
times_index += 1
else:
sleep(0.1)
# wait for all processes to finish --------------------------------
while process_data_list:
for i, process_data in enumerate(process_data_list):
if not process_data[1].is_alive():
print(f'process {process_data[0]} finished')
# remove from processes
p = process_data_list.pop(i)
del p
# start new search
break
print('ALL DONE !!!!!!')
# =======================================================================================
if __name__ == '__main__':
main()
It runs max_processes at once as you can see in the result.
process 0 started
process 1 started
process 2 started
process 3 started
process 3 finished
process 4 started
process 1 finished
process 5 started
process 0 finished
process 2 finished
process 5 finished
process 4 finished
ALL DONE !!!!!!
Upvotes: 1