Reputation: 23
I'm trying to solve this problem where I store the locations and counts of substrings of a given length. As the strings can be long (genome sequences), I'm trying to use multiple processes to speed it up. While the program runs, the variables that store the object seem to lose all information once the threads end.
import numpy
import multiprocessing
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict, namedtuple, Counter
from functools import partial
import ctypes as c
class MyManager(BaseManager):
pass
MyManager.register('defaultdict', defaultdict, DictProxy)
def gc_count(seq):
return int(100 * ((seq.upper().count('G') + seq.upper().count('C') + 0.0) / len(seq)))
def getreads(length, table, counts, genome):
genome_len = len(genome)
for start in range(0,genome_len):
gc = gc_count(genome[start:start+length])
table[ (length, gc) ].append( (start) )
counts[length,gc] +=1
if __name__ == "__main__":
g = 'ACTACGACTACGACTACGCATCAGCACATACGCATACGCATCAACGACTACGCATACGACCATCAGATCACGACATCAGCATCAGCATCACAGCATCAGCATCAGCACTACAGCATCAGCATCAGCATCAG'
genome_len = len(g)
mgr = MyManager()
mgr.start()
m = mgr.defaultdict(list)
mp_arr = multiprocessing.Array(c.c_double, 10*101)
arr = numpy.frombuffer(mp_arr.get_obj())
count = arr.reshape(10,101)
pool = multiprocessing.Pool(9)
partial_getreads = partial(getreads, table=m, counts=count, genome=g)
pool.map(partial_getreads, range(1, 10))
pool.close()
pool.join()
for i in range(1, 10):
for j in range(0,101):
print count[i,j]
for i in range(1, 10):
for j in range(0,101):
print len(m[(i,j)])
The loops at the end will only print out 0.0
for each element in count
and 0
for each list in m
, so somehow I'm losing all the counts. If i print the counts within the getreads(...)
function, I can see that the values are being increased. Conversely, printing len(table[ (length, gc) ])
in getreads(...)
or len(m[(i,j)])
in the main body only results in 0
.
Upvotes: 2
Views: 810
Reputation: 2279
You could also formulate your problem as a map-reduce problem, by which you would avoid sharing the data among multiple processes (i guess it would speed up the computation). You would just need to return the resulting table and counts from the function (map) and combine the results from all the processes (reduce).
Going back to your original question...
At the bottom of Managers there is a relevant note about modifications to mutable values or items in dict and list. Basically, you need to re-assign the modified object to the container proxy.
l = table[ (length, gc) ]
l.append( (start) )
table[ (length, gc) ] = l
There is also a relevant Stackoverflow post about combining pool map with Array.
Taking both into account you can do something like:
def getreads(length, table, genome):
genome_len = len(genome)
arr = numpy.frombuffer(mp_arr.get_obj())
counts = arr.reshape(10,101)
for start in range(0,genome_len):
gc = gc_count(genome[start:start+length])
l = table[ (length, gc) ]
l.append( (start) )
table[ (length, gc) ] = l
counts[length,gc] +=1
if __name__ == "__main__":
g = 'ACTACGACTACGACTACGCATCAGCACATACGCATACGCATCAACGACTACGCATACGACCATCAGATCACGACATCAGCATCAGCATCACAGCATCAGCATCAGCACTACAGCATCAGCATCAGCATCAG'
genome_len = len(g)
mgr = MyManager()
mgr.start()
m = mgr.defaultdict(list)
mp_arr = multiprocessing.Array(c.c_double, 10*101)
arr = numpy.frombuffer(mp_arr.get_obj())
count = arr.reshape(10,101)
pool = multiprocessing.Pool(9)
partial_getreads = partial(getreads, table=m, genome=g)
pool.map(partial_getreads, range(1, 10))
pool.close()
pool.join()
arr = numpy.frombuffer(mp_arr.get_obj())
count = arr.reshape(10,101)
Upvotes: 1