Reputation: 31
I have a pipeline that runs on a single core that I am trying to run hundreds of times in parallel over the 32 cores of an aws instance. The hope is that once a run is complete, the next one will start. I have written a queueing script based on one I found online and it seems to work but the efficiency trails off as if some workers are not accepting new processes but some are.
Here is the main code from my script (not sure why the indents are not showing up when I cut and paste here - they are correct in my script and everything works - so I don't think that's an issue):
def worker(work_queue, done_queue):
try:
for f in iter(work_queue.get, 'STOP'):
newPipe = Pipeline(PathtoFiles + testPipelineList, PathtoFiles, ('parallel',f))
done_queue.put("%s - %s got %s." % (current_process().name, f, newPipe))
log("%s - %s got %s." % (current_process().name, f, newPipe))
except Exception, e:
done_queue.put("%s failed on %s with: %s" % (current_process().name, f, e.message))
log("%s failed on %s with: %s" % (current_process().name, f, e.message))
return True
def log(string):
outfile = open('queue_gene_logfile' + testPipelineList,'a')
outfile.write(string + '\n')
outfile.close()
def main():
############### pipeline specific stuff
infile = open(PathtoFiles + '/' + testPipelineList,'r') #list of ogs of interest
count = 0
for line in infile: #makes input files and appends the name of the file to li
outfile = open(PathtoFiles+ '/FileLists_' + testPipelineList + '/list' + str(count),'w')
li.append('list' + str(count))
outfile.write(line)
outfile.close()
count = count + 1
############### queue stuff below
workers = 32
work_queue = Queue()
done_queue = Queue()
processes = []
for f in li: #for each file in FileLists
work_queue.put(f)
work_queue.put('STOP') #will iterate through the work queue until it comes to 'STOP'
for w in xrange(workers):
p = Process(target=worker, args=(work_queue, done_queue)) # work queue is a list of args (FileList names) for each pipeline
p.start()
processes.append(p) #a list of processes to join
#work_queue.put('STOP') #I don't know why this is here - seems like it will have many 'STOP's in the queue- moving it to above
for p in processes:
p.join()
done_queue.put('STOP')
for status in iter(done_queue.get, 'STOP'):
print status
if __name__ == '__main__':
main()
And here is a screenshot of the cpu usage monitor from amazon (I start a new run of 100 processes in the middle after the first 100 finished) - Ugh, I don't have enough reputation to post my image - it is here: http://www.science.smith.edu/departments/Biology/lkatz/images/CPU_use.png
Can anyone see why my efficiency goes down so steadily? Obviously, runs are being started after the initial 32, but not all cores are being used at all times.
Editing to add - I set up a 200 job run yesterday - at first, all 32 cores were running. Now, although there are 70 jobs left to do, only 18 are running - according to top. Not sure if there is any way I can find out what the cores are doing (if anything) or if there is anything I can do (even manually) to get the rest going. Here's what top looks like. Lots of sleeping/zombie jobs. Are these the problem, maybe?
top - 13:33:01 up 6 days, 20:59, 4 users, load average: 16.02, 16.05, 16.32
Tasks: 691 total, 17 running, 567 sleeping, 79 stopped, 28 zombie
%Cpu(s): 49.8 us, 0.2 sy, 0.0 ni, 50.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
MiB Mem: 245742 total, 69853 used, 175889 free, 1981 buffers
MiB Swap: 0 total, 0 used, 0 free, 35477 cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
69054 ubuntu 20 0 167m 157m 1272 R 100 0.1 210:06.19 raxmlHPC
2281 ubuntu 20 0 127m 118m 1268 R 100 0.0 345:45.54 raxmlHPC
55524 ubuntu 20 0 99520 89m 1288 R 100 0.0 242:37.41 raxmlHPC
64499 ubuntu 20 0 164m 154m 1276 R 100 0.1 217:24.57 raxmlHPC
72964 ubuntu 20 0 140m 130m 1280 R 100 0.1 202:28.98 raxmlHPC
73077 ubuntu 20 0 173m 163m 1272 R 100 0.1 202:18.51 raxmlHPC
78482 ubuntu 20 0 92484 80m 1276 R 100 0.0 195:14.09 raxmlHPC
78497 ubuntu 20 0 104m 96m 1292 R 100 0.0 195:11.02 raxmlHPC
79501 ubuntu 20 0 134m 124m 1272 R 100 0.1 191:59.39 raxmlHPC
86534 ubuntu 20 0 135m 125m 1276 R 100 0.1 91:46.00 raxmlHPC
88892 ubuntu 20 0 111m 102m 1276 R 100 0.0 84:53.55 raxmlHPC
90763 ubuntu 20 0 102m 93m 1276 R 100 0.0 77:32.76 raxmlHPC
94270 ubuntu 20 0 60544 50m 1280 R 100 0.0 42:05.16 raxmlHPC
95210 ubuntu 20 0 73604 62m 1280 R 100 0.0 39:38.67 raxmlHPC
102045 ubuntu 20 0 112m 86m 1272 R 100 0.0 3:31.35 raxmlHPC
98813 ubuntu 20 0 76844 51m 1268 R 100 0.0 21:21.22 raxmlHPC
102046 ubuntu 20 0 19808 2148 1120 R 1 0.0 0:00.11 top
2280 ubuntu 20 0 4440 628 528 S 0 0.0 0:00.00 sh
4343 ubuntu 20 0 25580 1116 880 S 0 0.0 0:00.31 screen
4350 ubuntu 20 0 134m 16m 3748 S 0 0.0 0:00.13 python
4359 ubuntu 20 0 143m 24m 1884 S 0 0.0 8:02.44 python
4360 ubuntu 20 0 145m 26m 1884 S 0 0.0 3:05.85 python
4363 ubuntu 20 0 0 0 0 Z 0 0.0 0:05.23 python
4366 ubuntu 20 0 0 0 0 Z 0 0.0 0:03.89 python
4370 ubuntu 20 0 0 0 0 Z 0 0.0 0:48.45 python
4372 ubuntu 20 0 142m 23m 1860 S 0 0.0 4:40.39 python
4376 ubuntu 20 0 143m 24m 1884 S 0 0.0 1:40.96 python
4379 ubuntu 20 0 0 0 0 Z 0 0.0 0:05.15 python
4383 ubuntu 20 0 141m 22m 1884 S 0 0.0 0:39.52 python
4386 ubuntu 20 0 0 0 0 Z 0 0.0 0:11.20 python
4388 ubuntu 20 0 146m 27m 1888 S 0 0.0 9:08.29 python
4390 ubuntu 20 0 143m 24m 1888 S 0 0.0 3:56.54 python
4392 ubuntu 20 0 0 0 0 Z 0 0.0 0:02.82 python
4396 ubuntu 20 0 0 0 0 Z 0 0.0 0:56.10 python
4401 ubuntu 20 0 0 0 0 Z 0 0.0 1:20.73 python
4405 ubuntu 20 0 0 0 0 Z 0 0.0 1:59.12 python
4408 ubuntu 20 0 141m 22m 1884 S 0 0.0 1:50.49 python
4413 ubuntu 20 0 143m 24m 1880 S 0 0.0 2:12.69 python
4420 ubuntu 20 0 0 0 0 Z 0 0.0 0:22.77 python
4424 ubuntu 20 0 144m 25m 1884 S 0 0.0 3:21.91 python
4428 ubuntu 20 0 145m 25m 1884 S 0 0.0 3:22.17 python
4431 ubuntu 20 0 0 0 0 Z 0 0.0 2:14.93 python
4434 ubuntu 20 0 141m 22m 1880 S 0 0.0 4:02.82 python
4438 ubuntu 20 0 142m 23m 1880 S 0 0.0 2:46.69 python
4443 ubuntu 20 0 0 0 0 Z 0 0.0 0:05.89 python
4446 ubuntu 20 0 143m 23m 1884 S 0 0.0 2:20.15 python
4450 ubuntu 20 0 144m 25m 1884 S 0 0.0 2:50.68 python
4453 ubuntu 20 0 143m 24m 1888 S 0 0.0 1:12.57 python
4457 ubuntu 20 0 0 0 0 Z 0 0.0 0:22.13 python
4461 ubuntu 20 0 0 0 0 Z 0 0.0 0:51.41 python
5688 ubuntu 20 0 4440 624 528 T 0 0.0 0:00.00 sh
5689 ubuntu 20 0 148m 138m 1276 T 0 0.1 240:22.68 raxmlHPC
8337 ubuntu 20 0 4440 628 528 T 0 0.0 0:00.00 sh
8338 ubuntu 20 0 153m 144m 1280 T 0 0.1 236:11.93 raxmlHPC
10025 ubuntu 20 0 4440 624 524 T 0 0.0 0:00.00 sh
10026 ubuntu 20 0 137m 127m 1276 T 0 0.1 234:20.39 raxmlHPC
10043 ubuntu 20 0 4440 628 528 T 0 0.0 0:00.00 sh
10044 ubuntu 20 0 127m 117m 1276 T 0 0.0 233:17.22 raxmlHPC
12386 ubuntu 20 0 25580 1116 880 S 0 0.0 0:00.45 screen
12433 ubuntu 20 0 4440 624 528 T 0 0.0 0:00.00 sh
12434 ubuntu 20 0 73660 64m 1292 T 0 0.0 160:00.80 raxmlHPC
14688 ubuntu 20 0 4440 624 528 T 0 0.0 0:00.00 sh
14689 ubuntu 20 0 88780 77m 1276 T 0 0.0 143:17.54 raxmlHPC
36470 ubuntu 20 0 25956 1744 968 S 0 0.0 0:59.78 screen
36471 ubuntu 20 0 20904 3532 1860 S 0 0.0 0:00.43 bash
45234 ubuntu 20 0 4440 628 528 T 0 0.0 0:00.00 sh
45235 ubuntu 20 0 116m 91m 1272 T 0 0.0 31:54.50 raxmlHPC
55523 ubuntu 20 0 4440 628 528 S 0 0.0 0:00.00 sh
64498 ubuntu 20 0 4440 628 528 S 0 0.0 0:00.00 sh
69053 ubuntu 20 0 4440 628 528 S 0 0.0 0:00.00 sh
71281 ubuntu 20 0 95004 1876 892 S 0 0.0 0:05.47 sshd
71310 ubuntu 20 0 21356 3984 1860 S 0 0.0 0:00.36 bash
72963 ubuntu 20 0 4440 628 528 S 0 0.0 0:00.00 sh
73076 ubuntu 20 0 4440 624 524 S 0 0.0 0:00.00 sh
78481 ubuntu 20 0 4440 628 528 S 0 0.0 0:00.00 sh
78496 ubuntu 20 0 4440 624 524 S 0 0.0 0:00.00 sh
79500 ubuntu 20 0 4440 624 524 S 0 0.0 0:00.00 sh
81682 ubuntu 20 0 4440 624 528 T 0 0.0 0:00.00 sh
81685 ubuntu 20 0 94980 83m 1276 T 0 0.0 48:33.73 raxmlHPC
83916 ubuntu 20 0 4440 628 528 T 0 0.0 0:00.00 sh
83917 ubuntu 20 0 65964 47m 1272 T 0 0.0 12:19.77 raxmlHPC
84034 ubuntu 20 0 25844 1612 968 S 0 0.0 0:10.79 screen
84035 ubuntu 20 0 20896 3524 1860 S 0 0.0 0:00.15 bash
84089 ubuntu 20 0 134m 16m 3748 T 0 0.0 0:00.14 python
84096 ubuntu 20 0 146m 27m 1860 T 0 0.0 3:07.70 python
84097 ubuntu 20 0 0 0 0 Z 0 0.0 5:13.17 python
Upvotes: 2
Views: 490
Reputation: 6466
Here is a simple example where the work to be done is defined in the function "hmmsearch". The objects to be worked on are defined in "bins". This will keep all 32 workers busy:
from multiprocessing import Pool
from tqdm import tqdm
def hmmsearch(bin): return bin.pfams.run(cpus=1)
bins = bt.merged.results.binner.results.bins
pool = Pool(processes=32)
iterator = pool.imap(hmmsearch, bins, chunksize=1)
i = 0
for result in tqdm(iterator, total=len(bins)):
print "Done bin %s." % bins[i]
i += 1
Upvotes: 2