Reputation: 3040
I have some large lists containing data, that needs processing. I'm using multiprocessing to get the job done faster and it works great unless the lists contain more than 100 lines, since it then starts to use a lot of CPU and Memory.
I have provided some sample data:
changed_devices = [[3036c360, 013-HX.ITEM-UA-01, DESCRIPTION 6, 172.29.6.2, Location#All Locations#013-LOCATION-1, IPSEC#Is IPSEC Device#No, Device Type#All Device Types#Switch, ISE Config#ISE Config#With ISE Config Monitor Mode, ISE_enable#ISE_enable#YES - Monitor Mode],
[306fd4c0, 013-HX.ITEM-UA-02, DESCRIPTION 7, 172.29.6.3, Location#All Locations#013-LOCATION-1, IPSEC#Is IPSEC Device#No, Device Type#All Device Types#Switch, ISE Config#ISE Config#With ISE Config Monitor Mode, ISE_enable#ISE_enable#YES - Monitor Mode],
[30a8bf10, 013-UX.ITEM-UA-01, DESCRIPTION 8, 172.29.6.10, Location#All Locations#013-LOCATION-1, IPSEC#Is IPSEC Device#No, Device Type#All Device Types#Switch, ISE Config#ISE Config#With ISE Config Monitor Mode, ISE_enable#ISE_enable#YES - Monitor Mode],
[30e293c0, 013-UX1.ITEM-UA-01, DESCRIPTION 1, 172.29.6.4, Location#All Locations#013-LOCATION-1, IPSEC#Is IPSEC Device#No, Device Type#All Device Types#Switch, ISE Config#ISE Config#With ISE Config Monitor Mode, ISE_enable#ISE_enable#YES - Monitor Mode],
[311cb690, 013-UX2.ITEM-UA-01, DESCRIPTION 2, 172.29.6.5, Location#All Locations#013-LOCATION-1, IPSEC#Is IPSEC Device#No, Device Type#All Device Types#Switch, ISE Config#ISE Config#With ISE Config Monitor Mode, ISE_enable#ISE_enable#YES - Monitor Mode],
[3155ef00, 013-UX3.ITEM-UA-01, DESCRIPTION 3, 172.29.6.6, Location#All Locations#013-LOCATION-1, IPSEC#Is IPSEC Device#No, Device Type#All Device Types#Switch, ISE Config#ISE Config#With ISE Config Monitor Mode, ISE_enable#ISE_enable#YES - Monitor Mode],
[318ed950, 013-UX4.ITEM-UA-01, DESCRIPTION 4, 172.29.6.7, Location#All Locations#013-LOCATION-1, IPSEC#Is IPSEC Device#No, Device Type#All Device Types#Switch, ISE Config#ISE Config#With ISE Config Monitor Mode, ISE_enable#ISE_enable#YES - Monitor Mode],
[31c8d510, 013-UX5.ITEM-UA-01, DESCRIPTION 5, 172.29.6.8, Location#All Locations#013-LOCATION-1, IPSEC#Is IPSEC Device#No, Device Type#All Device Types#Switch, ISE Config#ISE Config#With ISE Config Monitor Mode, ISE_enable#ISE_enable#YES - Monitor Mode],
[3202f7e0, 013-UX5.ITEM-UA-02, DESCRIPTION 5, 172.29.6.9, Location#All Locations#013-LOCATION-1, IPSEC#Is IPSEC Device#No, Device Type#All Device Types#Switch, ISE Config#ISE Config#With ISE Config Monitor Mode, ISE_enable#ISE_enable#YES - Monitor Mode]]
My current code:
if __name__ == "__main__":
for chg in range(len(changed_devices)):
p = multiprocessing.Process(target=start_exscript, args=(accounts,changed_devices[chg][0],changed_devices[chg][1],changed_devices[chg][2],changed_devices[chg][3],changed_devices[chg][4],changed_devices[chg][5],changed_devices[chg][6],changed_devices[chg][7],changed_devices[chg][8],changed_devices[chg][9],changed_devices[chg][10],))
processes.append(p)
p.start()
for process in processes:
process.join()
The lists can contain more than 5000 entries, so i would like the script to account for this and only start like 100 threads at a time. Anyone have an idea?
UPDATE:
I still cannot get this to work, so i'll supply some extra details.
I'm using Python 2.7 and i couldn't get concurrent.futures
to work.
The start_exscript
is just another function, it looks like this:
def start_exscript(accounts, host, ena, ise_nwdvs_id, ise_nwdvs_name, ise_nwdvs_ip, ise_nwdvs_desc, ise_nwdvs_loc, ise_nwdvs_ipsec, ise_nwdvs_type, ise_nwdvs_cfg, ise_nwdvs_ena):
try:
start(accounts, host, bind(update_config_cisco_device, ena), verbose=0) # starts Python Exscript
updateisedvsdata = [ise_nwdvs_id, ise_nwdvs_name, ise_nwdvs_ip, ise_nwdvs_desc, ise_nwdvs_loc, ise_nwdvs_ipsec, ise_nwdvs_type, ise_nwdvs_cfg, ise_nwdvs_ena]
ise_update_nw_dvs = ise_update_network_device(updateisedvsdata) # Start update function
if ise_update_nw_dvs:
logger.critical("[ROOT]: Updating ISE config variable on: [ "+ise_nwdvs_name+" ] [ OK ]")
else:
logger.critical("[ROOT]: Updating ISE config variable on: [ "+ise_nwdvs_name+" ] [ Failed ]")
except Exception as e:
logger.critical(e.message, e.args)
Python Exscript is just an add on which can login to network devices.
I'm having trouble passing all the arguments i have using Pool.
I tried this, but it wont start:
cpus = multiprocessing.cpu_count() - 1
pool = Pool(processes=cpus)
for chg in range(len(changed_devices)):
pool.apply_async(start_exscript,args=(accounts,changed_devices[chg][0],changed_devices[chg][1],changed_devices[chg][2],changed_devices[chg][3],changed_devices[chg][4],changed_devices[chg][5],changed_devices[chg][6],changed_devices[chg][7],changed_devices[chg][8],changed_devices[chg][9],changed_devices[chg][10]))
pool.close()
pool.join()
Nothing happens with this. I dont even get an error, which confuses me more.
UPDATE 2
Final working result for anyone looking out there. Thanks a lot to SyntaxVoid.
def start_exscript(args):
try:
# --- PLEASE READ ---
# Setup Switch/Router CLI access - remember to change
cliuser = "x"
clipass = "x"
accounts = Account(cliuser,clipass,needs_lock=False)
host = args[0]
ena = args[1]
ise_nwdvs_id = args[2]
ise_nwdvs_name = args[3]
ise_nwdvs_ip = args[4]
ise_nwdvs_desc = args[5]
ise_nwdvs_loc = args[6]
ise_nwdvs_ipsec = args[7]
ise_nwdvs_type = args[8]
ise_nwdvs_cfg = args[9]
ise_nwdvs_ena = args[10]
start(accounts, host, bind(update_config_cisco_device, ena), verbose=0) # Start Exscript process
updateisedvsdata = [ise_nwdvs_id, ise_nwdvs_name, ise_nwdvs_ip, ise_nwdvs_desc, ise_nwdvs_loc, ise_nwdvs_ipsec, ise_nwdvs_type, ise_nwdvs_cfg, ise_nwdvs_ena] # Update data
ise_update_nw_dvs = ise_update_network_device(updateisedvsdata) # Start update process
if ise_update_nw_dvs: # If update went well
logger.critical("[ROOT]: Updating ISE config variable on: [ "+ise_nwdvs_name+" ] [ OK ]")
else:
logger.critical("[ROOT]: Updating ISE config variable on: [ "+ise_nwdvs_name+" ] [ Failed ]")
except Exception as e:
logger.critical(e.message, e.args)
if __name__ == "__main__":
cpus = multiprocessing.cpu_count() - 1
pool = Pool(processes=cpus)
results = pool.map(start_exscript, changed_devices)
Upvotes: 1
Views: 336
Reputation: 2633
Here's an example using the multiprocessing
module.
Edit: Originally this manually handled the queues but as Roland pointed out, Pool
is already designed to do this for you.
In the if __name__ == "__main__"
block, we create the Pool with the maximum number of processes we want to allow and map
it to our list of input arguments. For this example, it will calculate the square of 0 through 24. Because of how map
works, the order of results
will match up with the order of your input arguments, even though they may finish executing out of order. The random processing time in the worker function helps demonstrate that.
Code:
from multiprocessing import Pool
import os
import time
import random
def worker(x):
pid = os.getpid()
print(f"PID: {pid} got {x}. Evaluating now...")
processing_time = random.uniform(0.2, 5)
time.sleep(processing_time)
output = x**2
print(f"PID: {pid} done evaluating in {processing_time:.2f}s. {x}**2 = {output}.")
return output
if __name__ == "__main__":
max_processes = 5
with Pool(max_processes) as pool:
results = pool.map(worker, range(25))
print(results)
EDIT:
To help apply this to your code, take a look at the pool.map
documentation.
map(func, iterable[, chunksize])
A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
Each element of the iterable you pass should be the set of arguments you want for each function call. Unfortunately, I don't think you'll be able to use keyword arguments for your start_exscript
function since you are only able to pass a single object. There are several ways around this but I think this is the most straight foward:
start_exscript
function like this:def start_exscript(args):
accounts, host, ena, ise_nwdvs_id, ise_nwdvs_name, ise_nwdvs_ip,
ise_nwdvs_desc, ise_nwdvs_loc, ise_nwdvs_ipsec, ise_nwdvs_type,
ise_nwdvs_cfg, ise_nwdvs_ena = args
# The rest of your function can stay the same
If you do this, then the object you use as your iterable to pass to map should look like a list of lists where each inner list is the arguments to pass to start_exscript
. Like:
pass_this_to_map = [ [ARGUMENTS_FOR_FIRST_CALL], [ARGUMENTS_FOR_SECOND_CALL], ... ]
If I'm understanding your code correctly, I think all you have to do is pass your changed_devices
list to map: pool.map(start_exscript, changed_devices)
after re-definding your function like I did above.
If you don't have access to the source code of start_exscript
(as in, you can't change it) then you have to write a wrapper like this:
def start_exscript_wrapper(f):
def inner(args):
return f(*args)
return inner
And then do
start_exscript = start_exscript_wrapper(start_exscript)
before you call it.
Upvotes: 2
Reputation: 43533
What you should do in such a case is use a limited number of processes and let them take care of all the items in the list.
Luckily, Python has you covered in this case.
Basically, there are three solutions:
multiprocessing.Pool
concurrent.futures.ThreadPoolExecutor
concurrent.futures.ProcessPoolExecutor
The name of your current worker function, start_exscript
, kind of implies that you are using e.g. subprocess
to start an external program.
If that is correct, I would suggest using concurrent.futures.ThreadPoolExecutor
. Here is one of my programs as a complete working example. It uses ffmpeg
to do a batch conversion of video's to MKV format:
from functools import partial
import argparse
import concurrent.futures as cf
import logging
import os
import subprocess as sp
import sys
__version__ = '1.4.1'
def main(argv):
"""
Entry point for vid2mkv.
Arguments:
argv: Command line arguments.
"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'-q', '--videoquality', type=int, default=6, help='video quality (0-10, default 6)'
)
parser.add_argument(
'-a', '--audioquality', type=int, default=3, help='audio quality (0-10, default 3)'
)
parser.add_argument(
'--log',
default='warning',
choices=['debug', 'info', 'warning', 'error'],
help="logging level (defaults to 'warning')"
)
parser.add_argument('-v', '--version', action='version', version=__version__)
parser.add_argument("files", metavar='file', nargs='+', help="one or more files to process")
args = parser.parse_args(argv)
logging.basicConfig(
level=getattr(logging, args.log.upper(), None), format='%(levelname)s: %(message)s'
)
logging.debug(f'command line arguments = {args}')
logging.debug(f'parsed arguments = {args}')
# Check for required programs.
try:
sp.run(['ffmpeg'], stdout=sp.DEVNULL, stderr=sp.DEVNULL)
logging.debug('found “ffmpeg”')
except FileNotFoundError:
logging.error('the “ffmpeg” program cannot be found')
sys.exit(1)
# Work starts here.
starter = partial(runencoder, vq=args.videoquality, aq=args.audioquality)
with cf.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp:
fl = [tp.submit(starter, t) for t in args.files]
for fut in cf.as_completed(fl):
fn, rv = fut.result()
if rv == 0:
logging.info(f'finished "{fn}"')
elif rv < 0:
logging.warning(f'file "{fn}" has unknown extension, ignoring it.')
else:
logging.error(f'conversion of "{fn}" failed, return code {rv}')
def runencoder(fname, vq, aq):
"""
Convert a video file to Theora/Vorbis streams in a Matroska container.
Arguments:
fname: Name of the file to convert.
vq : Video quality. See ffmpeg docs.
aq: Audio quality. See ffmpeg docs.
Returns:
(fname, return value)
"""
basename, ext = os.path.splitext(fname)
known = [
'.mp4', '.avi', '.wmv', '.flv', '.mpg', '.mpeg', '.mov', '.ogv', '.mkv', '.webm', '.gif'
]
if ext.lower() not in known:
return (fname, -1)
ofn = basename + '.mkv'
args = [
'ffmpeg', '-i', fname, '-c:v', 'libtheora', '-q:v',
str(vq), '-c:a', 'libvorbis', '-q:a',
str(aq), '-sn', '-y', ofn
]
logging.debug(' '.join(args))
logging.info(f'starting conversion of "{fname}".')
cp = sp.run(args, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
return fname, cp.returncode
if __name__ == '__main__':
main(sys.argv[1:])
Of note:
functools.partial
to pre-supply arguments that do not change between calls to runencoder
.as_completed
on a list of Futures
. This will yield results as soon as they become available.multiprocessing.Pool
or concurrent.futures.ProcessPoolExecutor
or running a number of subprocesses by hand, it usually makes no sense to start more processes than your CPU has cores. As you have discovered, you get lots of processes fighting over CPU resources.If all encoder runs take approximately the same time, I could have used the map
method of the Executor
, which is slightly simpler. And that would return results in the order they were submitted.
If you are not using subprocess
in the worker function, but are doing the actual work in Python, simple change ThreadPoolExecutor
to ProcessPoolExecutor
and you'd be fine.
Edit: Since you are on 2.7, a backport of concurrent.futures is available for 2.7.
But in your use-case, multiprocessing.Pool
should work OK.
Upvotes: 2