Miguel Oliveira
Miguel Oliveira

Reputation: 35

Using multiprocessing module in class

I have the following program and I want to use multiprocessing module. It uses external files, in which I call the PSO class from another file. costfunc is a function from another file and the other args are just variables.

Swarm is a list containing as much objects as the value of ps, and each object has multiple attributes which need to update at every iteration.

Following Hannu implemented multiprocessing.pool and it is working, however it is taking much more time than running sequentially.

Would appreciate if you could tell me what are the reasons for it happening and how can I make it run faster?

# IMPORT PACKAGES -----------------------------------------------------------+
    import random
    import numpy as np
    # IMPORT FILES --------------------------------------------------------------+
    from Reducer import initial

    # Particle Class ------------------------------------------------------------+
    class Particle:
        def __init__(self,D,bounds_x,bounds_v):
            self.Position_i = []          # particle position
            self.Velocity_i = []          # particle velocity
            self.Cost_i = -1              # cost individual
            self.Position_Best_i = []     # best position individual
            self.Cost_Best_i = -1         # best cost individual
            self.Constraint_Best_i = []   # best cost individual contraints 
            self.Constraint_i = []        # constraints individual
            self.Penalty_i = -1           # constraints individual  

            x0,v0 = initial(D,bounds_x,bounds_v)

            for i in range(0,D):
                self.Velocity_i.append(v0[i])
                self.Position_i.append(x0[i])

        # evaluate current fitness
        def evaluate(self,costFunc,i):
            self.Cost_i, self.Constraint_i,self.Penalty_i = costFunc(self.Position_i,i)

            # check to see if the current position is an individual best
            if self.Cost_i < self.Cost_Best_i or self.Cost_Best_i == -1:
                self.Position_Best_i = self.Position_i
                self.Cost_Best_i = self.Cost_i
                self.Constraint_Best_i = self.Constraint_i
                self.Penalty_Best_i = self.Penalty_i

            return self

    def proxy(gg, costf, i):
        print(gg.evaluate(costf, i))


    # Swarm Class ---------------------------------------------------------------+
    class PSO():
        def __init__(self,costFunc,bounds_x,bounds_v,ps,D,maxiter):

            self.Cost_Best_g = -1       # Best Cost for Group
            self.Position_Best_g = []   # Best Position for Group
            self.Constraint_Best_g = []
            self.Penalty_Best_g = -1

            # Establish Swarm
            Swarm = []
            for i in range(0,ps):
                Swarm.append(Particle(D,bounds_x,bounds_v))

            # Begin optimization Loop
            i = 1
            self.Evol = []
            while i <= maxiter:
                pool = multiprocessing.Pool(processes = 4)
                results = pool.map_async(partial(proxy, costf = costFunc, i=i), Swarm)
                pool.close()
                pool.join()

                Swarm = results.get()

                if Swarm[j].Cost_i< self.Cost_Best_g or self.Cost_Best_g == -1:
                        self.Position_Best_g = list(Swarm[j].Position_i)
                        self.Cost_Best_g = float(Swarm[j].Cost_i)
                        self.Constraint_Best_g = list(Swarm[j].Constraint_i)
                        self.Penalty_Best_g = float(Swarm[j].Penalty_i)

                self.Evol.append(self.Cost_Best_g)
                i += 1

Upvotes: 2

Views: 546

Answers (1)

Hannu
Hannu

Reputation: 12205

You need a proxy function to do the function call, and as you need to deliver arguments to the function, you will need partial as well. Consider this:

from time import sleep
from multiprocessing import Pool
from functools import partial

class Foo:
    def __init__(self, a):
        self.a = a
        self.b = None

    def evaluate(self, CostFunction, i):
        xyzzy = CostFunction(i)
        sleep(0.01)
        self.b = self.a*xyzzy
        return self

def CostFunc(i):
    return i*i

def proxy(gg, costf, i):
    return gg.evaluate(costf, i)

def main():
    Swarm = []
    for i in range(0,10):
        nc = Foo(i)
        Swarm.append(nc)

    p = Pool()
    for i in range(100,102):
        results = p.map_async(partial(proxy, costf=CostFunc, i=i), Swarm)
    p.close()
    p.join()

    Swarm = []
    for a in results.get():
        Swarm.append(a)

    for s in Swarm:
        print (s.b)

main()

This creates a Swarm list of objects, and within each of these objects is evaluate that is the function you need to call. Then we have parameters (CostFunc and an integer as in your code).

We will now use Pool.map_async to map your Swarm list to your pool. This gives each worker one instance of Foo from your Swarm list, and we have a proxy function that actually calls then evaluate().

However, as apply_async only sends an object from the iterable to the function, instead of using proxy as the target function to pool, we use partial to create the target function to pass the "fixed" arguments.

And as you apparently want to get the modified objects back, this requires another trick. If you modify the target object in Pool process, it just modifies the local copy and throws it away as soon as the processing completes. There would be no way for the subprocess to modify main process memory anyway (or vice versa), this would cause a segmentation fault.

Instead, after modifying the object, we return self. When your pool has completed its work, we discard the old Swarm and reassemble it from the result objects.

Upvotes: 1

Related Questions