Janus Gowda
Janus Gowda

Reputation: 305

Parallel processing with multiprocessing is slower than sequential

I have a code that I need to parallelize. The code itself works without issues. The code is a method of a python class. For example,

class test:
     def __init__(self):
         <...>
     def method(self):
         <...>

which I write like this because the details of the FULL CODE may not relevant and it is very long. At the beginning I tried to parallelize this code (just two instances):

t1=test()
t2=test()
pr1=Process(target=t1.method, args=(,))
pr2=Process(target=t2.method, args=(,))
pr1.start()
pr2.start()
pr1.join()
pr2.join()

But this did not work. Not only it was running much slower than running one instance and then another sequentially, but there was also the problem that the class variables were not modified. The last issue was solved thanks to the kind answer of @MattDMo in this thread, by creating a shared namespace, shared variables and shared lists with:

import multiprocessing as mp
<...>
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])
self.shared.V=V

But it still runs horribly slow.

At the beginning I thought that because I'm executing the code in a laptop with two cores, the two cores get saturated but the two instances and the computer gets slow because can't perform any other task quickly. So I decided to try the code in a desktop PC with 6 cores (also a linux system). It does not solve the problem. Still the parallelized version is much slower. On the other hand, the CPU's of the desktop computer doesn't get very hot as when I execute C compiled code with multi-threading. Anybody has a clue about what is going on?

The FULL CODE is here, and included below:

from math import exp
from pylab import *
from scipy.stats import norm
from scipy.integrate import ode
from random import gauss,random
from numpy import dot,fft
from time import time

import multiprocessing as mp
from multiprocessing import Pool
from multiprocessing import Process
from multiprocessing import Queue, Pipe
from multiprocessing import Lock, current_process


#Global variables

sec_steps=1000 #resolution (steps per second)
DT=1/float(sec_steps)
stdd=20 #standard deviation for retina random input
stdd2=20 #standard deviation for sigmoid

#FUNCTION TO APPROXIMATE NORMAL CUMULATIVE DISTRIBUTION FUNCTION

def sigmoid(x,mu,sigma):
    beta1=-0.0004406
    beta2=0.0418198
    beta3=0.9
    z=(x-mu)/sigma
    if z>8:
        return 1
    elif z<-8:
        return 0
    else:
        return 1/(1+exp(-sqrt(pi)*(beta1*z**5+beta2*z**3+beta3*z)))

#CLASSES

class retina: ##GAUSSIAN WHITE NOISE GENERATOR
    def __init__(self,mu,sigma):
        self.mu=mu
        self.sigma=sigma
    def create_pulse(self):
        def pulse():
            return gauss(self.mu,self.sigma)
            #return uniform(-1,1)*sqrt(3.)*self.sigma+self.mu
        return pulse
        def test_white_noise(self,N): #test frequency spectrum of random number generator for N seconds
                noise=[]
                pulse=self.create_pulse()
        steps=sec_steps*N+1
        t=linspace(0,N,steps)
                for i in t:
                        noise.append(pulse())
                X=fft(noise)
                X=[abs(x)/(steps/2.0) for x in X]
        xlim([0,steps/N])
        xlabel("freq. (Hz)")
        ylabel("Ampl. (V)")
                plot((t*steps/N**2)[1:],X[1:],color='black')
        #savefig('./wnoise.eps', format='eps', dpi=1000)
                show()
        return noise


class cleft: #object: parent class for a synaptic cleft
    def __init__(self):
        self.shared=manager.Namespace()
        self.shared.preV=0.0 #pre-synaptic voltage
        self.shared.r=0.0 #proportion of channels opened
    Tmax=1.0  #mM
    mu=-35.0  #mV
    sigma=stdd2 #mV

    def T(self): #Receives presynaptic Voltage preV, returns concentration of the corresponding neurotransmitter.
        return self.Tmax*sigmoid(self.shared.preV,self.mu,self.sigma)
    def r_next(self): #Solves kinematic ode  -analytical solution- to find r after one time step DT (needs T and alfa and beta parameters)
        """ 
        runs the ode for one unit of time dt, as specified
        updates the previous r taken as initial condition
        """
        tau=1.0/(self.alfa*self.T()+self.beta)
        r_inf=self.alfa*self.T()*tau
        self.shared.r=r_inf+(self.shared.r-r_inf)*exp(-DT/tau)
    def DI(self,postV): #Receives PSP and computes resulting change in PSC
        return self.g*self.shared.r*(postV-self.restV)

class ampa_cleft(cleft): #Child class for ampa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5 #initial condition for r
        self.alfa=2.0
        self.beta=0.1
        self.restV=0.0
        self.g=0.1


class gaba_a_cleft(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.shared=manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-75.0
        self.g=0.2

class gaba_a_cleft_trnTOtrn(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-75.0
        self.g=0.2

class gaba_a_cleft_inTOin(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-75.0
        self.g=0.2

class gaba_a_cleft_trnTOtcr(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-85.0
        self.g=0.1

class gaba_a_cleft_inTOtcr(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-85.0
        self.g=0.1

class gaba_b_cleft(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.shared.R=0.5
        self.shared.X=0.5
        self.alfa_1=0.02
        self.alfa_2=0.03
        self.beta_1=0.05
        self.beta_2=0.01
        self.restV=-100.0
        self.g=0.06

        self.n=4
        self.Kd=100 #Dissociation constant
    def r_next(self): #Solves kinematic ode  SECOND MESSENGER -analytical solution- to find r after one time step DT (needs T and alfa and beta parameters)
        """ 
        runs the ode for one unit of time dt, as specified
        updates the previous r taken as initial condition
        """
        Q1=self.alfa_1*self.T()
        Q2=-Q1-self.beta_1
        R0=self.shared.R
        X0=self.shared.X
        self.shared.R=(Q1*(exp(Q2*DT)-1)+Q2*R0*exp(Q2*DT))/Q2
        self.shared.X=(exp(-self.beta_2*DT)*(self.alfa_2*(self.beta_2*(exp(DT*(self.beta_2+Q2))*(Q1+Q2*R0)+Q1*(-exp(self.beta_2*DT))-Q2*R0)-Q1*Q2*(exp(self.beta_2*DT)-1))+self.beta_2*Q2*X0*(self.beta_2+Q2)))/(self.beta_2*Q2*(self.beta_2+Q2))
        self.shared.r=self.shared.X**self.n/(self.shared.X**self.n+self.Kd)

#######################################################################################################################################################

class neuronEnsemble:
    def __init__(self,V):  #Parent class for a Neuron ensemble
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
    kappa=1.0 #conductance
    def V_next(self):  #ode analitycally for a single time step DT 
        K1=self.C[0]*self.g/self.kappa
        K2=(-dot(self.C,self.I)+self.C[0]*self.g*self.restV)/self.kappa
        self.shared.V=K2/K1+(self.shared.V-K2/K1)*exp(-K1*DT)

class TCR_neuronEnsemble(neuronEnsemble):
    def __init__(self,V):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
        self.g=0.01 #conductance of leak
        self.restV=-55.0 #rest of leak
        self.C=(1.0,7.1,1.0/2.0*30.9/4.0,1.0/2.0*3.0*30.9/4.0,1.0/2.0*30.9)     #Cleak,C2,C3,C4,C7!!  #connectivity constants to the ensemble
                        #First one is Cleak, the others in same order as in diagram

class TRN_neuronEnsemble(neuronEnsemble):
    def __init__(self,V):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
        self.g=0.01 #conductance of leak
        self.restV=-72.5 #rest of leak
        self.C=(1.0,15.0,35.0,0.0,0.0)  #Cleak,C5,C8  #connectivity constants to the ensemble
                        #First one is Cleak, the others in same order as in diagram

class IN_neuronEnsemble(neuronEnsemble): #!!! update all parameters !!!
    def __init__(self,V):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
        self.g=0.01 #conductance of leak
        self.restV=-70.0 #rest of leak
        self.C=(1.0,47.4,23.6,0.0,0.0)  #Cleak,C1,C6!!  #connectivity constants to the ensemble
                                #First one is Cleak, the others in same order as in diagram
######################################INSTANCE GROUP#################################################################
class group:
    def __init__(self,tcr_V0,trn_V0,in_V0):
        #Declarations of instances
        ####################

        #SYNAPTIC CLEFTS
        self.cleft_ret_in=ampa_cleft() #cleft between retina and IN ensemble
        self.cleft_ret_tcr=ampa_cleft() #cleft between retina and TCR ensemble
        self.cleft_in_in=gaba_a_cleft_inTOin() #cleft between IN and IN ensembles
        self.cleft_in_tcr=gaba_a_cleft_inTOtcr() #cleft between IN and TCR ensembles
        self.cleft_tcr_trn=ampa_cleft() #cleft between TCR and TRN ensembles
        self.cleft_trn_trn=gaba_a_cleft_trnTOtrn() #cleft between TRN and TRN ensembles
        self.cleft_trn_tcr_a=gaba_a_cleft_trnTOtcr() #cleft between TRN and TCR ensembles GABAa
        self.cleft_trn_tcr_b=gaba_b_cleft() #cleft between TRN and TCR ensembles GABAb
        #POPULATIONS    
        self.in_V0=in_V0 #mV i.c excitatory potential
        self.IN=IN_neuronEnsemble(self.in_V0) #create instance of IN ensemble

        self.tcr_V0=tcr_V0 #mV i.c excitatory potential
        self.TCR=TCR_neuronEnsemble(self.tcr_V0) #create instance of TCR ensemble

        self.trn_V0=trn_V0 #mV i.c inhibitory potential
        self.TRN=TRN_neuronEnsemble(self.trn_V0) #create instance of TCR ensemble
    def step(self,p): #makes a step of the circuit for the given instance
        #UPDATE TRN
        self.cleft_tcr_trn.shared.preV=self.TCR.shared.V #cleft takes presynaptic V
        self.cleft_tcr_trn.r_next()   #cleft updates r
        self.TRN.I[2]=self.cleft_tcr_trn.DI(self.TRN.shared.V) #update PSC TCR--->TRN 

        self.cleft_trn_trn.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
        self.cleft_trn_trn.r_next()   #cleft updates r
        self.TRN.I[1]=self.cleft_trn_trn.DI(self.TRN.shared.V) #update PSC TRN--->TRN

        self.TRN.V_next()  #update PSP in TRN

        #record retinal pulse ------|> IN AND TCR
        self.cleft_ret_in.shared.preV=self.cleft_ret_tcr.shared.preV=p

        #UPDATE TCR
        self.cleft_ret_tcr.r_next()      #cleft updates r
        self.TCR.I[1]=self.cleft_ret_tcr.DI(self.TCR.shared.V) #update PSC RET---|> TCR

        self.cleft_trn_tcr_b.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
        self.cleft_trn_tcr_b.r_next()   #cleft updates r
        self.TCR.I[2]=self.cleft_trn_tcr_b.DI(self.TCR.shared.V)  #update PSC

        self.cleft_trn_tcr_a.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
        self.cleft_trn_tcr_a.r_next()   #cleft updates r
        self.TCR.I[3]=self.cleft_trn_tcr_a.DI(self.TCR.shared.V) #cleft updates r

        self.cleft_in_tcr.shared.preV=self.IN.shared.V #cleft takes presynaptic V
        self.cleft_in_tcr.r_next()   #cleft updates r
        self.TCR.I[4]=self.cleft_in_tcr.DI(self.TCR.shared.V) #update PSC

        self.TCR.V_next()

        #UPDATE IN

        self.cleft_ret_in.r_next()       #cleft updates r
        self.IN.I[1]=self.cleft_ret_in.DI(self.IN.shared.V) #update PSC

        self.cleft_in_in.shared.preV=self.IN.shared.V #cleft takes presynaptic V
        self.cleft_in_in.r_next()   #cleft updates r
        self.IN.I[2]=self.cleft_in_in.DI(self.IN.shared.V)  #update PSC

        self.IN.V_next()
                #----------------------------------------
    def stepN(self, p, N, data_Vtcr, data_Vtrn, data_Vin): #makes N steps, receives a vector of N retinal impulses and output lists
        data_Vtcr.append(self.tcr_V0)
        data_Vtrn.append(self.trn_V0)
        data_Vin.append(self.in_V0)
        for i in xrange(N):
            self.step(p[i])
            data_Vtcr.append(self.TCR.shared.V)     #write to output list
            data_Vtrn.append(self.TRN.shared.V)
            data_Vin.append(self.IN.shared.V)
            name=current_process().name
            print name+" "+str(i)

######################################################################################################################
############################### CODE THAT RUNS THE SIMULATION OF THE MODEL ###########################################
######################################################################################################################

def run(exec_t): 
    """
    runs the simulation for t=exec_t seconds
    """
    t_0=time()
    mu=-45.0 #mV
    sigma=stdd  #20.0 #mV
    ret=retina(mu,sigma) #create instance of white noise generator
    #initial conditions
    tcr_V0=-61.0 #mV i.c excitatory potential
    trn_V0=-84.0 #mV i.c inhibitory potential
    in_V0=-70.0 #mV i.c excitatory potential
    ###########################LISTS FOR STORING DATA POINTS################################
    t=linspace(0.0,exec_t,exec_t*sec_steps+1)
#   data_Vtcr=[]
#   data_Vtcr.append(tcr_V0)
#
#   data_Vtrn=[]
#   data_Vtrn.append(trn_V0)
#
#   data_Vin=[]
#   data_Vin.append(in_V0)
#   ###NUMBER OF INSTANCES
#   N=2
#   pulse=ret.create_pulse()
#   #CREATE INSTANCES
#   groupN=[]
#   for i in xrange(N):
#       g=group(in_V0,tcr_V0,trn_V0)
#       groupN.append(g)
#
#   for i in t[1:]:
#       p=pulse()
#       proc=[]
#       for j in xrange(N):
#           pr=Process(name="group_"+str(j),target=groupN[j].step, args=(p,))
#           pr.start()
#           proc.append(pr)
#       for j in xrange(N):
#           proc[j].join(N)
#
#       data_Vtcr.append((groupN[0].TCR.shared.V+groupN[1].TCR.shared.V)*0.5)     #write to output list
#       data_Vtrn.append((groupN[0].TRN.shared.V+groupN[1].TRN.shared.V)*0.5)
#       data_Vin.append((groupN[0].IN.shared.V+groupN[1].IN.shared.V)*0.5)
#############FOR LOOPING INSIDE INSTANCE ---FASTER#############################################
    #CREATE p vector of retinal pulses
    p=[]
    pulse=ret.create_pulse()
    for k in xrange(len(t)-1):
        p.append(pulse())   
    #CREATE INSTANCES
    N=2
    groupN=[]
    proc=[]

    manager=mp.Manager() #creating a shared namespace

    data_Vtcr_0 = manager.list()
    data_Vtrn_0 = manager.list()
    data_Vin_0  = manager.list()

    data_Vtcr_1 = manager.list()
    data_Vtrn_1 = manager.list()
    data_Vin_1  = manager.list()

    data_Vtcr=[data_Vtcr_0, data_Vtcr_1]
    data_Vtrn=[data_Vtrn_0, data_Vtrn_1]
    data_Vin=[data_Vin_0, data_Vin_1]

    for j in xrange(N):
        g=group(tcr_V0,trn_V0,in_V0)
        groupN.append(g)
    for j in xrange(N):
        pr=Process(name="group_"+str(j),target=groupN[j].stepN, args=(p, len(t)-1, data_Vtcr[j], data_Vtrn[j], data_Vin[j],))
        pr.start()
        proc.append(pr)
    for j in xrange(N):
        proc[j].join()
    data_Vtcr_av=[0.5*i for i in map(add, data_Vtcr[0], data_Vtcr[1])]
    data_Vtrn_av=[0.5*i for i in map(add, data_Vtrn[0], data_Vtrn[1])]
    data_Vin_av =[0.5*i for i in map(add, data_Vin[0],  data_Vin[1])]

    print len(t), len(data_Vtcr[0]), len(data_Vtcr_av)
    ##Plotting#####################################
    subplot(3,1,1)
    xlabel('t')
    ylabel('tcr - mV')
    plot(t[50*sec_steps:],array(data_Vtcr_av)[50*sec_steps:], color='black')

    subplot(3,1,2)
    xlabel('t')
    ylabel('trn - mV')
    plot(t[50*sec_steps:],array(data_Vtrn_av)[50*sec_steps:], color='magenta')

    subplot(3,1,3)
    xlabel('t')
    ylabel('IN - mV')
    plot(t[50*sec_steps:],array(data_Vin_av)[50*sec_steps:], color='red')

    #savefig('./v_tcr.eps', format='eps', dpi=1000)
    ###############################################

    t_1=time() #measure elapsed time
    print "elapsed time: ", t_1-t_0, " seconds."
    #save data to file
    FILE=open("./output.dat","w")
    FILE.write("########################\n")
    FILE.write("# t                                                           V       #\n")
    FILE.write("########################\n")
    for k in range(len(t)):
        FILE.write(str(t[k]).zfill(5)+"\t"*3+repr(data_Vtcr_av[k])+"\n")
    FILE.close()
    #################
    show()
    return t,array(data_Vtcr)

######################################################################################################################
######################################################################################################################
if __name__ == "__main__":
    run(60)    #run simulation for 60 seconds

Upvotes: 3

Views: 4672

Answers (3)

dano
dano

Reputation: 94871

Your problem is that you're relying too heavily on multiprocessing.Manager Proxy objects to do your mathematical calculations. I tried to warn you about this downside to multiprocessing.Manager in my answer to your original question, but my wording wasn't strong enough. I said this:

Just keep in mind that a multiprocessing.Manager starts a child process to manage all the shared instances you create, and that every time you access one of the Proxy instances, you're actually making an IPC call to the Manager process.

I should have added: "And IPC calls are much more expensive than normal access within the same process" . Your original question didn't really indicate how extensively you were going to be using the Manager instances, so I didn't think to emphasize it.

Consider this small example that simply reads from one Proxy variable in a loop:

>>> timeit.timeit("for _ in range(1000): x = v + 2", setup="v = 0", number=1000)
0.040110111236572266
>>> timeit.timeit("for _ in range(1000): x = shared.v + 2", 
                  setup="import multiprocessing ; m = multiprocessing.Manager() ; shared = m.Namespace(); shared.v = 0", 
                  number=1000)
15.048354864120483

It's almost 400x slower when you introduce the shared variable. Now, this example is a little extreme because we're accessing the shared variable in a tight loop, but the point stands; accessing the Proxy variables is slow. And you're doing that a lot in your program. The extra overhead of accessing the Proxy is much, much more expensive than what you gain by running two processes simultaneously.

You're going to need to refactor this code significantly to keep usage of Proxy variables to an absolute minimum. You may find more success replacing most of the usages of multiprocessing.Namespace with multiprocessing.Value, which are stored in shared memory, rather than in a separate process. This makes them much faster (though still much slower than regular variables):

>>> timeit.timeit("for _ in range(1000): x = v.value + 2", setup="import multiprocessing ; v = multiprocessing.Value('i', 0)", number=1000)
0.29022717475891113

Things get even faster if you initialize it with lock=False:

>>> timeit.timeit("for _ in range(1000): x = v.value + 2", setup="import multiprocessing ; v = multiprocessing.Value('i', 0, lock=False)", number=1000)
0.06386399269104004

but then the Value is no longer automatically process-safe. You'll need to explicitly create and take a multiprocessing.Lock to synchronize access to the variables if they could potentially get changed in both processes simultaneously.

The only other restriction with multiprocessing.Value is that you're limited to types supported by ctypes or the array module. This actually should mostly be ok for you, since you're primarily using ints and floats. The only pieces that you may need to keep as Proxy instances are the lists, though you could probably use a multiprocessing.Array, too.

Upvotes: 9

Tommy
Tommy

Reputation: 622

How long is each process running for - how much work are they doing? There is an overhead with starting off a separate process - when you do so a new child process must be created and the environment of the parent process has to be copied over to it. If you are dealing with very short lived jobs then it could well be slower than a single process, single threaded alternative.

If your individual jobs are short try increasing the amount of work they are doing and see if you get an improvement in speed.

Upvotes: 1

Joshua
Joshua

Reputation: 2479

Some numeric python modules, like numpy, can change the cpu affinity of the python interpreter (how many cores the interpreter can use at once). This often occur when the python module is linked against certain multithreaded BLAS libraries. The issue can cause multiple python processes to run on only one core, making it slower than the single threaded version, especially when dealing with shared state.

Check if your program is not using all your cores. If it is not using all of them, you can change the cpu affinity back by executing a system call (using os.system) to the taskset command, with the proper arguments.

Upvotes: 1

Related Questions