Tingiskhan
Tingiskhan

Reputation: 1345

Multiprocessing in NumPy

I've scoured the web for any solutions to my problem, but haven't really found anything that helps me. My problem is that I wish to speed up my program by implementing multiprocessing. The function getSVJJPrice is rather fast. However, the size of K is around 1000, making the entire code pretty slow. I therefore wonder if there's anyway to parallelize the for loop? The code is found below.

def func2min(x,S,expiry,K,r,prices,curr):
    bid = prices[:,0]
    ask = prices[:,1]

    C_omega = [0]*len(K)
    w = [0]*len(K)

    for ind, k in enumerate(K):
        w[ind] = 1/np.abs(bid[ind] - ask[ind])
        C_omega[ind] = getSVJJPrice(x[0],(x[1] + x[0]**2)/(2*x[2]),
        x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],S[ind],k,r[ind],expiry[ind],
                curr[ind])  

    right = np.sum(w * (np.subtract(C_omega, np.mean(prices,axis=1)))**2)

    print right
    #if right < 10:    
    #    print '\n \n func = ', right 

    if math.isnan(right):
        right = 1e12

    return right

Thanks a million to whomever looks into this!

Best regards,

Victor

Upvotes: 3

Views: 1999

Answers (1)

pseudocubic
pseudocubic

Reputation: 1039

It seems like multiprocessing.Pool might be appropriate for your case, since you are looping over each element in K, and K seems like its just a 1-D array from your code.

Basically, you first have to write a function that performs the loop, in my example parallel_loop, and then you have to split your problem into separate chunks, in this case, you will just split K into an integer number of pieces nprocs.

Then you can use pool.map to perform the loop over each chunk in parallel, and the results will be collected back in order of the chunks, which have the same order as your original K since we didn't rearrange anything, just performed slices. Then you just have to put all of the pieces back into w and C_omega.

import numpy as np
from multiprocessing import Pool

def parallel_loop(K_chunk):
    C_omega_chunk = np.empty(len(K_chunk)
    w_chunk = np.empty(len(K_chunk))

    for ind, k in enumerate(K_chunk)
        w_chunk[ind] = 1/np.abs(bid[ind] - ask[ind])
        C_omega_chunk[ind] = getSVJJPrice(x[0],(x[1] + x[0]**2)/(2*x[2]),
        x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],S[ind],k,r[ind],expiry[ind],
                curr[ind])  

    return (w_chunk, C_omega_chunk)

def func2min(x,S,expiry,K,r,prices,curr,nprocs):
    bid = prices[:,0]
    ask = prices[:,1]

    K = np.array(K)

    K_chunks = [K[n * len(K) // nprocs : (n + 1) * len(K) // nprocs] for n in range(nprocs)]
    pool = Pool(processes=nprocs)  
    outputs = pool.map(parallel_loop, K_chunks)

    w, C_omega = (np.concatenate(var) for var in zip(*outputs))

    right = np.sum(w * (np.subtract(C_omega, np.mean(prices,axis=1)))**2)

    print right
    #if right < 10:    
    #    print '\n \n func = ', right 

    if math.isnan(right):
        right = 1e12

    return right

Since I don't have an example data set I can't be sure that the above example would work as-is, but I think that it should give you a general idea of how it works.

Upvotes: 2

Related Questions