Reputation: 15
I'm trying to implement the algorithm from this paper, seen below.
import gzip
import numpy as np
k = 16
training_set = np.array([('TCCCTACACT', 9),
('AGTTGGTATT', 12),
('AGTGGATCAC', 8),
('CGCAAGTGTG', 3),
('CTGTTCCCCC', 7),
('CGACTGGTAA', 10),
('CGCCGAGAAG', 4),
('TAGCTACGAC', 5),
('CCTTGCGCGT', 11),
('CCAAAAGAAA', 12),
('CCTCAGGAGG', 6),
('AGGCCACTTA', 5),
('GCGGGAACGG', 5),
('CTATTACCAA', 2),
('ACACTTTTTT', 8),
('GATGCAGCGT', 1)])
test_set= np.array([('GATGCAGCGT', 3),
('GATGCAGCGT', 0),
('GATGCAGCGT', 3),
('GATGCAGCGT', 4)])
for ( x1, _ ) in test_set:
Cx1 = len( gzip . compress ( x1.encode() ) )
distance_from_x1 = []
for ( x2, _ ) in training_set:
Cx2 = len( gzip.compress(x2.encode()))
x1x2 = " ".join([ x1 , x2 ])
Cx1x2 = len( gzip.compress ( x1x2.encode() ))
ncd = ( Cx1x2 - min( Cx1 , Cx2 ) ) / max(Cx1 , Cx2 )
distance_from_x1.append( ncd )
sorted_idx = np.argsort ( np.array(distance_from_x1 ) )
top_k_class = list(training_set [ sorted_idx[: k] , 1])
predict_class = max(set( top_k_class ) ,key = top_k_class.count )
print(predict_class)
However, as the computation time explodes with the length of the dataset, I'm trying to implement multiprocessing. My intuition says that multiprocessing the inner loop would be the biggest time saver. I've seen answers to other questions on how to do multiprocessing on nested loops, but none with the caveat that other code still has to be run outside the inner loop but within the outer loop.
I've tried this code:
import gzip
import numpy as np
from multiprocessing import Pool, cpu_count
def calculate_ncd(x1, x2):
Cx1 = len(gzip.compress(x1.encode()))
Cx2 = len(gzip.compress(x2.encode()))
x1x2 = " ".join([x1, x2])
Cx1x2 = len(gzip.compress(x1x2.encode()))
ncd = (Cx1x2 - min(Cx1, Cx2)) / max(Cx1, Cx2)
return ncd
def process_train_sample(x1, x2):
return calculate_ncd(x1, x2)
def process_test_sample(test_sample):
x1, _ = test_sample
pool = Pool(cpu_count())
distance_from_x1 = pool.starmap(process_train_sample, [([x1] * len(training_set), training_set)])
pool.close()
pool.join()
sorted_idx = np.argsort(np.array(distance_from_x1))
top_k_class = list(training_set[sorted_idx[:k], 1])
predict_class = max(set(top_k_class), key=top_k_class.count)
return predict_class
k = 16
training_set = np.array([('TCCCTACACT', 9),
('AGTTGGTATT', 12),
('AGTGGATCAC', 8),
('CGCAAGTGTG', 3),
('CTGTTCCCCC', 7),
('CGACTGGTAA', 10),
('CGCCGAGAAG', 4),
('TAGCTACGAC', 5),
('CCTTGCGCGT', 11),
('CCAAAAGAAA', 12),
('CCTCAGGAGG', 6),
('AGGCCACTTA', 5),
('GCGGGAACGG', 5),
('CTATTACCAA', 2),
('ACACTTTTTT', 8),
('GATGCAGCGT', 1)])
test_set= np.array([('GATGCAGCGT', 3),
('GATGCAGCGT', 0),
('GATGCAGCGT', 3),
('GATGCAGCGT', 4)])
results = [process_test_sample(sample) for sample in test_set]# Process test samples in parallel
print(results) # Print the predicted classes
But from what I've seen managing the pool overhead in each iteration of the outer loop is inefficient and the computation time increases.
I would like to know how to use a pool for each turn of the inner loop without having to manage the overhead of the pool on each turn of the outer loop.
EDIT: I've added dummy data since the actual data is hundreds of thousands of nucleotides long.
Upvotes: 1
Views: 74
Reputation: 195573
You can try to leverage the shared memory (to not copy the values back and forth):
import gzip
import numpy as np
from multiprocessing import Pool
from multiprocessing.managers import SharedMemoryManager
from multiprocessing.shared_memory import ShareableList
def do_work(args):
k = 16
sl1, sl2, sl3, (x1, _) = args
training_set = ShareableList(name=sl1.shm.name)
training_set_lengths = ShareableList(name=sl2.shm.name)
training_set_values = ShareableList(name=sl3.shm.name)
Cx1 = len(gzip.compress(x1.encode()))
distance_from_x1 = []
for x2, Cx2 in zip(training_set, training_set_lengths):
x1x2 = " ".join([x1, x2])
Cx1x2 = len(gzip.compress(x1x2.encode()))
ncd = (Cx1x2 - min(Cx1, Cx2)) / max(Cx1, Cx2)
distance_from_x1.append(ncd)
sorted_idx = np.argsort(np.array(distance_from_x1))
top_k_class = [training_set_values[idx] for idx in sorted_idx[:k]]
predict_class = max(set(top_k_class), key=top_k_class.count)
return predict_class
if __name__ == '__main__':
k = 16
training_set = [
("TCCCTACACT", 9),
("AGTTGGTATT", 12),
("AGTGGATCAC", 8),
("CGCAAGTGTG", 3),
("CTGTTCCCCC", 7),
("CGACTGGTAA", 10),
("CGCCGAGAAG", 4),
("TAGCTACGAC", 5),
("CCTTGCGCGT", 11),
("CCAAAAGAAA", 12),
("CCTCAGGAGG", 6),
("AGGCCACTTA", 5),
("GCGGGAACGG", 5),
("CTATTACCAA", 2),
("ACACTTTTTT", 8),
("GATGCAGCGT", 1),
] # * 50_000
test_set = np.array(
[("GATGCAGCGT", 3), ("GATGCAGCGT", 0), ("GATGCAGCGT", 3), ("GATGCAGCGT", 4)] # * 10
)
with SharedMemoryManager() as smm, Pool() as pool:
training_set_sl = smm.ShareableList([str(x2) for x2, _ in training_set])
training_set_values = smm.ShareableList([int(val) for _, val in training_set])
training_set_comp_lengths = smm.ShareableList([len(gzip.compress(x2.encode())) for x2, _ in training_set])
for i, result in enumerate(pool.imap(do_work, ((training_set_sl, training_set_comp_lengths, training_set_values, t) for t in test_set), chunksize=1)):
print(i, result)
With training_set
of length 800_000
and test_set
of length 40
I've managed to run it under one minute (58,7s
on my 8C/16T 5700X, Ubuntu 20.04, Python 3.10.9).
Upvotes: 0
Reputation: 382
If the overhead of the multithreading is too much for it to be worth it, don't use it. You mean you want to keep the pool 'open' and reuse it for all outer loop iterations? That sounds like you parallelize both loops. Why not parallelize both loops? That surely is the quickest way to calculate a grid of X1, X2 combinations. However, you could also create a pool and in each task for that pool perform the loop.
Upvotes: 0