Pookie
Pookie

Reputation: 1279

pool.map freezing when object is large

I am using pool.map to populate a dictionary-- called nodes. To be clear: this dictionary is populated after pool.map runs so sharing the variable between processes is not a a concern. Everything the function returns and everything in the dictionary IS picklable. It is populating a dictionary that is essentially a graph. When I go 1, 2, 3 deep into populating this graph the program runs flawlessly. However at 4 deep: the program doesn't seem to crash but just freezes. I set up print statements in the function I am mapping to and at the very end of its run it prints the statement at the very top of the program and then freezes. Here is how I am calling pool.map:

    currentNode = startingNode
    nodesPopulated = [currentNode]
    connections = []
    merger = []
    pool = Pool(cpu_count())
    for currentDepth in range(1, depth):
        print('=' * 70)
        print("=  At depth", currentDepth)
        connections = []
        for item in nodesPopulated:
            if item != None:
                if item.isPopulated():
                    connections +=list(item.getConnections().values())
        print("=  Current number of connections:",len(connections))
        print("=  Current number of NodesPopulated in this iteration: ",len(nodesPopulated))
        print("=  Total number of nodes",len(self.nodes.keys()))
        nodesPopulated = pool.map(self.populateTopicNode, connections)
        print('\n=  Successfully populated another round of nodes')
        for node in nodesPopulated:
            if node != None:
                if item.isPopulated():
                    self.nodes[node.getTopic().getName()] = node
                #self.populatedNodes[node.getTopic().getName()] = True;
        print('=  Updated self.nodes\n')

    pool.close()
    pool.join()

    print('\nCount = ',len(list(self.nodes.keys())))
    return

Once again, I make sure everything returned into nodesPopulated is picklable. I am at my wits end because to run this program 4 deep takes about 2 hours and without pool.map it works flawlessly but takes about 6 hours. I don't want to ditch multiprocessing but I can't figure this out and it takes forever to debug. The last thing it prints before freezing is 'D' which is at the top of self.populateTopicNode. I also think the object that is getting too large(both self.nodes and connections) may be why this is freezing.

Note: I am certain this is a multiprocessing issue because I ran this exact code without using pool.map and replaced it with a for loop and it ran to completion without error. So something is causing pool.map to freeze. No error message just gets hung at the very first reference to the parameter of the function. Here are the first few lines of 'populateTopicNode':

def populateTopicNode(self, node: TopicNode):
    print('D')
    if(node.isPopulated()):
        return None

The last thing seen on the console before freezing is 'D'

EDIT: I did some tests to give you the exact numbers of when it hangs: enter image description here

and it hangs using about 1300 mb of memory.

EDIT2:

Okay so I found out that it IS returning something not just hanging randomly. It returns None and then hangs. I am unsure why because there is plenty of times when it returns None and works fine. I also wrapped my function in a try except to see if returning an exception to the parent was freaking out and that isn't the problem either. No exceptions are being caught and it IS running to a point where it returns. It just hangs after returning.

EDIT3:

It breaks at the same exact spot every single iteration. I print the name of the current Topic it is processing and it always breaks at the same spot at the same line and then hangs. I am unsure if that helps but it is additional information. Consistently breaking at the same exact time.

Upvotes: 9

Views: 2727

Answers (2)

Hielke Walinga
Hielke Walinga

Reputation: 2845

I recently had a case were I needed to improve speed of multiprocessing. I managed to improve my speed by using an iterator based approach and using imap instead of map. Imap takes an iterator and iterates over the the iterator. Map actually first converts the second argument to a list and then passed it to its workers. This might be a bottleneck for your code, but I am not hundred percent sure.

Using imap and an iterator will at least save memory, perhaps increase speed, and maybe solve your crash.

I propose something like this.

from itertools import chain

connections = chain(map(
         lambda item: item.getConnections.values(), 
         filter(lambda item : item != None and item.isPopulated(), nodesPopulated)
         ))

p.imap(self.populateTopicNode, connections, chunksize=1024)

Note: You should test if I really correctly converted connections correctly to an iterator.

Note2: p.imap actually returns an iterator. The function itself is therefore non-blocking, but will block (wait for output) when you iterate over its values in the for loop. This may or may not be what you what depending on possible side effects of your functions. Otherwise convert it to a list before looping.

Note3: Chunksize is the size of each chunk that is send to a worker. If this is too low too much requests for a new chunk are send to the main process creating a bottleneck. It this is too high you can force too much contents at one go to the workers. Or you create idle workers in the end since one or two workers are still busy with their chunk. If the function you are using ends in about equal time for all tasks the best chunksize would be the amount of tasks divided by the amount of workers (CPUs). This way each worker requests a chunk only once. Chunksize is also a parameter in p.map, so maybe that already works for your code right now.

Note4: There is also imap_unorded which does the same, but the returning iterator returns each output whenever something is ready, and therefore it might be "unordered". Might be interesting to use instead, but in my case the whole iteration took somehow more time than just imap.

Note5: Like I said, I have no clue why your program hangs, and this approach helped to speed up my program, but yours might hang depending on other reasons. Noxdafox approach is probably a better solution since you really rule out the main process as your bottleneck, but if this approach works it would be a more elegant solution, I think.

Upvotes: 3

noxdafox
noxdafox

Reputation: 15040

From the multiprocessing guidelines.

As far as possible one should try to avoid shifting large amounts of data between processes.

multiprocessing.Pool relies on a locked buffer (an OS Pipe) to distribute the tasks between the workers and retrieve their results. If an object larger than the buffer is pushed trough the pipe, there are chances the logic might hang.

I'd suggest you to dump the jobs to files (using pickle for example) and send the filenames to the child processes. In this way each process can retrieve the data independently. Not only you prevent your logic from getting stuck but you will notice speed improvements as well as the pipe becomes a severe bottleneck in your design.

Upvotes: 7

Related Questions