Geeocode
Geeocode

Reputation: 5797

Python multiprocessing doesn't nest subprocess?

I have code that looks something like this:

import multiprocessing
import logging

m = [0,1,2,3]
iter_state = 0

class gener(object):
    def __init__(self, m):
        self.m = m
        self.c = 0

    def __iter__(self):
        return self

    def next(self):
        print "n"
        time.sleep(3)
        ret = self.m[self.c]
        self.c += 1
        return ret 

tt  = gener(m)

itst = multiprocessing.Array('i', 3)

def gen(t):
    itst[t] = t

multiprocessing.log_to_stderr(logging.DEBUG)    

tm = time.time()

job1 = multiprocessing.Process(target=gen, args=(tt.next(),))
job2 = multiprocessing.Process(target=gen, args=(tt.next(),))
job3 = multiprocessing.Process(target=gen, args=(tt.next(),))


job1.start()
job2.start()
job3.start()
job3.join()

for i in itst: 
    print i

tm = time.time() - tm
print tm

With the following output:

OUT:

n
n
n
[INFO/Process-1] child process calling self.run()
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] child process calling self.run()
[INFO/Process-2] process shutting down
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
[INFO/Process-3] child process calling self.run()
[INFO/Process-3] process shutting down
[DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3] running the remaining "atexit" finalizers
[INFO/Process-3] process exiting with exitcode 0
0
1
2
9.01742887497
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

So we can see that the parallelization actually doesn't work at all.

However, when the time.sleep() call is placed inside of the gen() function, we see this:

import multiprocessing
import logging

m = [0,1,2,3]
iter_state = 0

class gener(object):
    def __init__(self, m):
        self.m = m
        self.c = 0

    def __iter__(self):
        return self

    def next(self):
        print "n"
        ret = self.m[self.c]
        self.c += 1
        return ret 

tt  = gener(m)

itst = multiprocessing.Array('i', 3)

def gen(t):
    time.sleep(3)
    itst[t] = t

multiprocessing.log_to_stderr(logging.DEBUG)    

tm = time.time()

job1 = multiprocessing.Process(target=gen, args=(tt.next(),))
job2 = multiprocessing.Process(target=gen, args=(tt.next(),))
job3 = multiprocessing.Process(target=gen, args=(tt.next(),))


job1.start()
job2.start()
job3.start()
job3.join()

for i in itst: 
    print i

tm = time.time() - tm
print tm

OUT:

n
n
n
[INFO/Process-1] child process calling self.run()
[INFO/Process-2] child process calling self.run()
[INFO/Process-3] child process calling self.run()
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-3] process shutting down
[DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3] running the remaining "atexit" finalizers
[INFO/Process-3] process exiting with exitcode 0
[INFO/Process-2] process shutting down
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
0
1
2
3.01985812187
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

Now we can see that parallelization works fine!

BUT unfortunately only for the gen() function.

My question:

How can we achieve to the parallelization work inside of the next() method too?

Upvotes: 1

Views: 1076

Answers (1)

dano
dano

Reputation: 94891

Well, you could do it by only actually calling next() inside of gen. Right now, you're calling tt.next() in the parent process, and passing the return value to the child. Instead, you should just pass the entire tt.next method to the child. But note that by doing that, you need to make gener.c a process-shared variable, and use a process-safe lock to protect the section of code where you increment it.

import multiprocessing
import logging
import time

class gener(object):
    def __init__(self, m):
        self.m = m
        self.c = multiprocessing.Value('i', 0)

    def __iter__(self):
        return self

    def next(self):
        print "n"
        time.sleep(3)
        with self.c.get_lock():
            ret = self.m[self.c.value]
            self.c.value += 1
        return ret 

def gen(func):
    t = func()
    itst[t] = t

if __name__ == "__main__":
    m = [0,1,2,3]
    iter_state = 0

    tt  = gener(m)
    itst = multiprocessing.Array('i', 3)
    multiprocessing.log_to_stderr(logging.DEBUG)    

    tm = time.time()

    jobs = [ multiprocessing.Process(target=gen, args=(tt.next,)) for _ in range(3)]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()

    for i in itst: 
        print i

    tm = time.time() - tm
    print tm

Output:

[INFO/Process-3] child process calling self.run()
n
[INFO/Process-1] child process calling self.run()
[INFO/Process-2] child process calling self.run()
n
n
[INFO/Process-2] process shutting down
[INFO/Process-3] process shutting down
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3] running the remaining "atexit" finalizers
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-3] process exiting with exitcode 0
0
1
2
3.02282786369
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

Upvotes: 2

Related Questions