Adam
Adam

Reputation: 332

python apply_async does not call method

I have a method which needs to process through a large database, that would take hours/days to dig through The arguments are stored in a (long) list of which max X should be processed in one batch. The method does not need to return anything, yet i return "True" for "fun"...

The function is working perfectly when I'm iterating through it linearly (generating/appending the results in other tables not seen here), yet I am unable to get apply_async or map_async work. (it worked before in other projects) Any hint of what might I be doing wrong would be appreciated, thanks in advance! See code below:

    import multiprocessing as mp

    class mainClass:
        #loads of stuff

    def main():
        multiprocess = True
        batchSize = 35

        mC = mainClass()
        while True:
            toCheck = [key for key, value in mC.lCheckSet.items()] #the tasks are stored in a dictionary, I'm referring to them with their keys, which I turn to a list here for iteration.
            if multiprocess == False:
                #this version works perfectly fine
                for i in toCheck[:batchSize]:
                    mC.check(i)
            else:
                #the async version does not, either with apply_async...
                with mp.Pool(processes = 8) as pool:
                    temp = [pool.apply_async(mC.check, args=(toCheck[n],)) for n in range(len(toCheck[:batchSize]))]
                    results = [t.get() for t in temp]

                #...or as map_async
                pool = mp.Pool(processes = 8)
                temp = pool.map_async(mC.check, toCheck[:batchSize])
                pool.close()
                pool.join()

    if __name__=="__main__":
        main()

Upvotes: 6

Views: 7878

Answers (2)

Rogelio Triviño
Rogelio Triviño

Reputation: 6519

I got to this question with the same problem, my apply_async calls not called at all, but the reason on my case was that the parameters number on apply_async call was different to the number on function declaration

Upvotes: 2

jsbueno
jsbueno

Reputation: 110261

The "smell" here is that you are instantiating your maincClass on the main Process, just once, and then trying to call a method on it on the different processes - but note that when you pass mC.check to your process pool, it is a method already bound to the class instantiated in this process.

I'd guess there is where your problem lies. Although that could possibly work - and it does - I made this simplified version and it works as intended :

import multiprocessing as mp
import random, time

class MainClass:
    def __init__(self):
        self.value = 1
    def check(self, arg):
        time.sleep(random.uniform(0.01, 0.3))
        print(id(self),self.value,  arg)

def main():
    mc = MainClass()

    with mp.Pool(processes = 4) as pool:
        temp = [pool.apply_async(mc.check, (i,)) for i in range(8)]
        results = [t.get() for t in temp]

main()

(Have you tried just adding some prints to make sure the method is not running at all?) So, the problem lies likely in some complex state in your MainClass that does not make it to the parallel processes in a good way. A possible work-around is to instantiate your mainclasses inside each process - that can be easily done since MultiProcessing allow you to get the current_process, and use this object as a namespace to keep data in the process instantiated in the worker Pool, across different calls to apply async.

So, create a new check function like the one bellow - and instead of instantiating your mainclass in the mainprocess, instantiate it inside each process in the pool:

import multiprocessing as mp
import random, time

def check(arg):
    process = mp.current_process
    if not hasattr(process, "main_class"):
        process.main_class = MainClass()
    process.main_class.check(arg)


class MainClass:
    def __init__(self):
        self.value = random.randrange(100)
    def check(self, arg):
        time.sleep(random.uniform(0.01, 0.3))
        print(id(self),self.value,  arg)

def main():
    mc = MainClass()

    with mp.Pool(processes = 2) as pool:
        temp = [pool.apply_async(check, (i,)) for i in range(8)]
        results = [t.get() for t in temp]

main()

Upvotes: 4

Related Questions