havij
havij

Reputation: 1180

parallelize a nested loop in python with multiprocessing

I have a nested loop below which, given a long string S of length n and a query Q of length m, checks (naively) whether Q is a substring of S with max_error number of mismatches. Since each iteration of the outer loop is independent, it makes sense to parallelize it for speedup (right?).

For example, if S=ATGGTC, and Q=TCCT, and max_err=2, then Q is a match since substring TGGT in S has <=2 mismatches with Q. But, if max_err=1, then Q is not a match in S because there is no substring of length 4 in S that matches with P with <=1 mismatches.

def is_match(S,n,Q,m,max_err): # lengths; assume n>m
  for i in range(0,n-m):
    cnt = 0
    for k in range(m):
      if S[i+k]!=Q[k]: cnt += 1
      if cnt>max_err: break
    if cnt<=max_err: return i
  return -1

To parallelize it, I can make the inner loop a function like below. (I moved the return -1 inside this function)

def inner_loop(S,Q,m,i):
    cnt = 0
    for k in range(m):
      if S[i+k]!=Q[k]: cnt += 1
      if cnt>max_err: break
    if cnt<=max_err: return i
    return -1

Now, I need to run the outer loop in parallel, collect the results, and check if there is a non--1 in the results. I am stuck here. Is the following code on the right way? How to collect the results?

def is_match_parallel(S,n,Q,m,max_err):
    inner_loop_1_par = inner_loop(S=S, Q=Q, m=m, i)
    result = []
    pool = multiprocessing.Pool()
    ## the following line is not probably right
    result = pool.map(inner_loop_1_par, range(n-m))
    for i in range(n-m):
        if result[i]!=-1: return i
    return -1

Also, as soon as any of the processes above returns a non--1 value, we are done, and we don't need to wait for the rest of them. Is there anyway to do this?

Upvotes: 2

Views: 2820

Answers (1)

atru
atru

Reputation: 4744

Your idea worked in a modified form. Namely, pool.map() had to be substituted with a bit more complex approach since your function requires more parameters than just an iterable parameter. Also, using your functions relies on returned values and that also needs special treatment in multiprocessing.

The following solution uses separately called instances of Process object and a shared variable that indicates whether there was a match for given conditions:

from multiprocessing import Process, Value

def inner_loop(flag,S,Q,m,i,max_err):
    cnt = 0
    if flag.value == -1:
        for k in range(m):
            if S[i+k]!=Q[k]:
                cnt += 1
            if cnt>max_err:
                break
        if cnt<=max_err:
            flag.value = 1

def is_match_parallel(S,n,Q,m,max_err):
    flag = Value('i', -1)
    processes = []
    for i in range(0,n-m):
        p = Process(target=inner_loop, args=(flag,S,Q,m,i,max_err,))
        processes.append(p)
        p.start()
    for p in processes: p.join()
    return flag

This solution was written in Python 2.7 but it should be straightforward to implement it in Python 3.X.

The is_match_parallel initializes a shared variable flag using a special multiprocessing object Value. That object is by default supposed to be protected from multiple threads accessing it at the same time so that part of the program is skipped. flag is an integer ("i") indicator of weather Q is a substring of S. It is initialized to -1 because the remaining multiprocessed inner_loop will change it to 1 only if a match was found.

A separate process is started for each i in the n-m range. Note that for large ranges you may want to benchmark this and make sure that starting many processes doesn't slow things down. In that case you would need to split the loop into larger chunks which should not be a problem.

p = Process(target=inner_loop, args=(flag,S,Q,m,i,max_err,)) sets the target function to inner_loop with arguments args, including the shared variable flag. After that the process is started. After the loop, for p in processes: p.join() causes the program to wait on that line until all process have finished processing inner_loop.

If a match is found, the process processing the inner_loop will set flag to 1 with flag.value = 1. If not, flag value will remain -1 signifying no matches where found. When all processes are done, is_match_parallel returns current flag as a Value object (that can be changed by returning only flag.value).

In this current version of inner_loop all processes call the inner_loop but a process will enter the main body of the function - with the loop - only if the flag is still -1. This way, once the flag becomes 1, processes will only race through the function and skip the time consuming part.

Example usages for your inputs are:

s = 'ATGGTC'
q = 'TCCT'

res = is_match_parallel(s, len(s), q, len(q), 2)
print res.value

res = is_match_parallel(s, len(s), q, len(q), 1)
print res.value

Which return 1 and -1 respectively.

Upvotes: 3

Related Questions