Reputation: 1179
run.py
def work(repo,cpuid):
my_tool_subprocess = subprocess.Popen('./scan.py {} {}'.format(repo,cpuid),shell=True, stdout=subprocess.PIPE)
line = True
while line:
myline = my_tool_subprocess.stdout.readline()
if "scan is done" in myline:
break
num = 10 # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
cpuid=1
for repo in repos:
tp.apply_async(work, (repo[0],"1-240"))
print('Runing {} at core {}'.format(repo[0],"1-240"))
tp.close()
tp.join()
scan.py
completed = subprocess.run(['git', 'clone', repo],env=my_env)
bunch of other subprocess.run()
# at the end:
print('returncode:', completed.returncode)
print('scan is done')
I was expecting number of active process to be at 10 (10 threads), but somehow ... it is not. It seems it does not wait until "scan is done", last statement in scan.py but goes through the list of repos (for loop) cloning all the repos from the repos list. To repeat, it does not wait for 1st-10th repo be cloned and processed (maintaining a moving window of 10 processes) it just goes ... creating additional processes and repos clone.
Anybody has an idea what is wrong here?
Upvotes: 0
Views: 715
Reputation: 23326
Try refactoring your code like this:
In scan.py
, move all the module level code into a function e.g. like:
def run(repo, cpuid):
# do whatever scan.py does given a repo path and cpuid
# instead of printing to stdout, have this return a value
If you still care about scan.py
having a command-line interface as well, add:
import argparse
def main(argv=None):
parser = argparse.ArgumentParser()
# ... implement command-line argument parsing here
args = parser.parse_args(argv)
value = run(args.repo, args.cpuid)
print(value)
if __name__ == '__main__':
main()
Now in your run.py
do something like:
import multiprocessing
import scan # maybe give this a more specialized name
def work(args):
repo, cpuid = args
output = scan.run(repo, cpuid)
for line in output.splitlines():
# Do whatever you want to do here...
def main():
repos = ... # You didn't show us where this comes from
pool = multiprocessing.Pool() # Or pass however many processes
pool.map(work, [(r[0], '1-240') for r in repos])
if __name__ == '__main__':
main()
Something like this. The point I'm trying to make here is that if you factor your code wisely it will make multiprocessing much simpler. Some of the details here are slightly opinionated, however.
Upvotes: 2