Lostsoul
Lostsoul

Reputation: 25999

multiprocessing pool hanging and unable to break out of app

I'm sure this is a rookie mistake but I can't figure out what I'm doing wrong with multiprocessing. I have this code(that just sits around and does nothing)

if __name__ == '__main__':
    pool = Pool(processes=4)  
    for i, x in enumerate(data): 
        pool.apply_async(new_awesome_function, (i, x))
    pool.close()
    pool.join()

data is a list([1,2,3,4,5]) and I'm trying to take the list send each item to be done over multiple cpu but when I wrap my working command into a function and send this code it doesn't do anything(when I call the function itself without above code it works fine). So I think I'm using multiprocessing wrong(although I took examples from sites), any suggestions?

Update: I noticed that I can't even break out of it when it freezes with control-c..that always works to get out of my buggy programs. I looked at python2.5 multiprocessing Pool and tried to follow the advice and added the import inside my if statement but no luck

Update2: I'm sorry, just realized thanks to the answer below that the command works but it doesn't seem to be terminating the program or letting me force quit.

Upvotes: 4

Views: 9531

Answers (3)

Winston Ewert
Winston Ewert

Reputation: 45039

I don't know what database you are using, but chances are you can't share database connections between your processes like that.

On linux, fork() is used, which makes a copy of everything in memory when you start the subprocess. However things like socket, open files, and database connection won't work properly unless specifically designed to do so.

On Window, fork() is unavailable so it'll rerun your script. In your case, that'll be really bad cause it'll drop everything again. You could prevent that by dropping inside the if __name__ == '__main__': bit.

You should be able to reopen the database connections in the my_awesome_function and thus be able to sucesfully interact with the database.

Truth be told, you aren't going to gain any speed doing this. In fact, I expect this to be slower. See databases are really really slow. Your process is going to spend most of its time waiting for the database. Now you just have multiple processes waiting for the database and that really will not improve the situation.

But databases are for storing things. As long as you are doing processing, you should really do that inside your code before hitting the database. You are basically using the database a s a set, and your code would be much nicer using a python set. If you really need to put that stuff in a database, do that at the end of your program.

Upvotes: 2

Winston Ewert
Winston Ewert

Reputation: 45039

Multiprocessing isn't threading.

You're probably doing something sorta like this

data = {}

def new_awesome_function(a, b):
    data[a] = b

After you run the script, data has not changed. This is because multiprocessing uses copies of your program. Your functions are being run, but they are run in copies of your program and thus have no effect on your original program.

In order to make use of multiprocessing you need to explicitly communicate from one process to another. With threading everything is shared, but with multiprocessing nothing is shared unless you explicitly share it.

The simplest way is to use return values:

def new_awesome_function(a, b):
    return a + b

result = pool.apply_async(new_awesome_function, (1, 2))
# later...
value = result.get()

See the python documentation: http://docs.python.org/library/multiprocessing.html, for other methods such as Queues, Pipes, and Managers. What you can't do is change your program state and expect that to work.

Upvotes: 3

Rik Poggi
Rik Poggi

Reputation: 29302

Your code seems to work for me:

from multiprocessing import Pool
import time

def new_awesome_function(a,b):
    print(a,b, 'start')
    time.sleep(1)
    print(a,b, 'end')

if __name__ == '__main__':
    data = [1,2,3,4,5]
    pool = Pool(processes=4)
    for i, x in enumerate(data): 
        pool.apply_async(new_awesome_function, (i, x))
    pool.close()
    pool.join()

gave me:

0 1 start
1 2 start
2 3 start
3 4 start
1 2 end
0 1 end
4 5 start
2 3 end
3 4 end
4 5 end

What makes you think it doesn't work?


Edit: Try to run this and look at the output:

from multiprocessing import Pool
import time

def new_awesome_function(a,b):
    print(a,b, 'start')
    time.sleep(1)
    print(a,b, 'end')
    return a + b

if __name__ == '__main__':
    data = [1,2,3,4,5]
    pool = Pool(processes=4)
    results = []
        for i, x in enumerate(data): 
        r = pool.apply_async(new_awesome_function, (i, x))
        results.append((i,r))
    pool.close()
    already = []
    while len(already) < len(data):
        for i,r in results:
            if r.ready() and i not in already:
                already.append(i)
                print(i, 'is ready!')
    pool.join()

Mine is:

0 1 start
1 2 start
2 3 start
3 4 start
0 1 end
4 5 start
1 2 end
2 3 end
0 is ready!
3 4 end
1 is ready!
2 is ready!
3 is ready!
4 5 end
4 is ready!

Upvotes: 2

Related Questions