DLS
DLS

Reputation: 15

How do I apply multiprocessing to a function in the inner loop of a nested loop while still running code in the outer loop?

I'm trying to implement the algorithm from this paper, seen below.

import gzip
import numpy as np

k = 16
training_set = np.array([('TCCCTACACT', 9),
          ('AGTTGGTATT', 12),
          ('AGTGGATCAC', 8),
          ('CGCAAGTGTG', 3),
          ('CTGTTCCCCC', 7),
          ('CGACTGGTAA', 10),
          ('CGCCGAGAAG', 4),
          ('TAGCTACGAC', 5),
          ('CCTTGCGCGT', 11),
          ('CCAAAAGAAA', 12),
          ('CCTCAGGAGG', 6),
          ('AGGCCACTTA', 5),
          ('GCGGGAACGG', 5),
          ('CTATTACCAA', 2),
          ('ACACTTTTTT', 8),
          ('GATGCAGCGT', 1)])
test_set= np.array([('GATGCAGCGT', 3),
           ('GATGCAGCGT', 0),
           ('GATGCAGCGT', 3),
           ('GATGCAGCGT', 4)])
for ( x1, _ ) in test_set:
    Cx1 = len( gzip . compress ( x1.encode() ) )
    distance_from_x1 = []
    for ( x2, _ ) in training_set:
        Cx2 = len( gzip.compress(x2.encode()))
        x1x2 = " ".join([ x1 , x2 ])
        Cx1x2 = len( gzip.compress ( x1x2.encode() ))
        ncd = ( Cx1x2 - min( Cx1 , Cx2 ) ) / max(Cx1 , Cx2 )
        distance_from_x1.append( ncd )
    sorted_idx = np.argsort ( np.array(distance_from_x1 ) )

    top_k_class = list(training_set [ sorted_idx[: k] , 1])
    predict_class = max(set( top_k_class ) ,key = top_k_class.count )
    print(predict_class)

However, as the computation time explodes with the length of the dataset, I'm trying to implement multiprocessing. My intuition says that multiprocessing the inner loop would be the biggest time saver. I've seen answers to other questions on how to do multiprocessing on nested loops, but none with the caveat that other code still has to be run outside the inner loop but within the outer loop.

I've tried this code:

import gzip
import numpy as np
from multiprocessing import Pool, cpu_count
def calculate_ncd(x1, x2):
    Cx1 = len(gzip.compress(x1.encode()))
    Cx2 = len(gzip.compress(x2.encode()))
    x1x2 = " ".join([x1, x2])
    Cx1x2 = len(gzip.compress(x1x2.encode()))
    ncd = (Cx1x2 - min(Cx1, Cx2)) / max(Cx1, Cx2)
    return ncd

def process_train_sample(x1, x2):
    return calculate_ncd(x1, x2)

def process_test_sample(test_sample):
    x1, _ = test_sample
    pool = Pool(cpu_count())
    distance_from_x1 = pool.starmap(process_train_sample, [([x1] * len(training_set), training_set)])
    pool.close()
    pool.join()
    sorted_idx = np.argsort(np.array(distance_from_x1))
    top_k_class = list(training_set[sorted_idx[:k], 1])
    predict_class = max(set(top_k_class), key=top_k_class.count)
    return predict_class

k = 16
training_set = np.array([('TCCCTACACT', 9),
          ('AGTTGGTATT', 12),
          ('AGTGGATCAC', 8),
          ('CGCAAGTGTG', 3),
          ('CTGTTCCCCC', 7),
          ('CGACTGGTAA', 10),
          ('CGCCGAGAAG', 4),
          ('TAGCTACGAC', 5),
          ('CCTTGCGCGT', 11),
          ('CCAAAAGAAA', 12),
          ('CCTCAGGAGG', 6),
          ('AGGCCACTTA', 5),
          ('GCGGGAACGG', 5),
          ('CTATTACCAA', 2),
          ('ACACTTTTTT', 8),
          ('GATGCAGCGT', 1)])
test_set= np.array([('GATGCAGCGT', 3),
           ('GATGCAGCGT', 0),
           ('GATGCAGCGT', 3),
           ('GATGCAGCGT', 4)])

results = [process_test_sample(sample) for sample in test_set]# Process test samples in parallel


print(results)  # Print the predicted classes

But from what I've seen managing the pool overhead in each iteration of the outer loop is inefficient and the computation time increases.

I would like to know how to use a pool for each turn of the inner loop without having to manage the overhead of the pool on each turn of the outer loop.

EDIT: I've added dummy data since the actual data is hundreds of thousands of nucleotides long.

Upvotes: 1

Views: 74

Answers (2)

Andrej Kesely
Andrej Kesely

Reputation: 195573

You can try to leverage the shared memory (to not copy the values back and forth):

import gzip
import numpy as np

from multiprocessing import Pool
from multiprocessing.managers import SharedMemoryManager
from multiprocessing.shared_memory import ShareableList


def do_work(args):
    k = 16

    sl1, sl2, sl3, (x1, _) = args
    training_set = ShareableList(name=sl1.shm.name)
    training_set_lengths = ShareableList(name=sl2.shm.name)
    training_set_values = ShareableList(name=sl3.shm.name)

    Cx1 = len(gzip.compress(x1.encode()))

    distance_from_x1 = []
    for x2, Cx2 in zip(training_set, training_set_lengths):
        x1x2 = " ".join([x1, x2])
        Cx1x2 = len(gzip.compress(x1x2.encode()))
        ncd = (Cx1x2 - min(Cx1, Cx2)) / max(Cx1, Cx2)
        distance_from_x1.append(ncd)

    sorted_idx = np.argsort(np.array(distance_from_x1))
    top_k_class = [training_set_values[idx] for idx in sorted_idx[:k]]
    predict_class = max(set(top_k_class), key=top_k_class.count)
    return predict_class


if __name__ == '__main__':
    k = 16
    training_set = [
            ("TCCCTACACT", 9),
            ("AGTTGGTATT", 12),
            ("AGTGGATCAC", 8),
            ("CGCAAGTGTG", 3),
            ("CTGTTCCCCC", 7),
            ("CGACTGGTAA", 10),
            ("CGCCGAGAAG", 4),
            ("TAGCTACGAC", 5),
            ("CCTTGCGCGT", 11),
            ("CCAAAAGAAA", 12),
            ("CCTCAGGAGG", 6),
            ("AGGCCACTTA", 5),
            ("GCGGGAACGG", 5),
            ("CTATTACCAA", 2),
            ("ACACTTTTTT", 8),
            ("GATGCAGCGT", 1),
        ] # * 50_000

    test_set = np.array(
        [("GATGCAGCGT", 3), ("GATGCAGCGT", 0), ("GATGCAGCGT", 3), ("GATGCAGCGT", 4)] # * 10
    )

    with SharedMemoryManager() as smm, Pool() as pool:
        training_set_sl = smm.ShareableList([str(x2) for x2, _ in training_set])
        training_set_values = smm.ShareableList([int(val) for _, val in training_set])
        training_set_comp_lengths = smm.ShareableList([len(gzip.compress(x2.encode())) for x2, _ in training_set])

        for i, result in enumerate(pool.imap(do_work, ((training_set_sl, training_set_comp_lengths, training_set_values, t) for t in test_set), chunksize=1)):
            print(i, result)

With training_set of length 800_000 and test_set of length 40 I've managed to run it under one minute (58,7s on my 8C/16T 5700X, Ubuntu 20.04, Python 3.10.9).

Upvotes: 0

KazM
KazM

Reputation: 382

If the overhead of the multithreading is too much for it to be worth it, don't use it. You mean you want to keep the pool 'open' and reuse it for all outer loop iterations? That sounds like you parallelize both loops. Why not parallelize both loops? That surely is the quickest way to calculate a grid of X1, X2 combinations. However, you could also create a pool and in each task for that pool perform the loop.

Upvotes: 0

Related Questions