Reputation: 5797
I have code that looks something like this:
import multiprocessing
import logging
m = [0,1,2,3]
iter_state = 0
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
print "n"
time.sleep(3)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
itst = multiprocessing.Array('i', 3)
def gen(t):
itst[t] = t
multiprocessing.log_to_stderr(logging.DEBUG)
tm = time.time()
job1 = multiprocessing.Process(target=gen, args=(tt.next(),))
job2 = multiprocessing.Process(target=gen, args=(tt.next(),))
job3 = multiprocessing.Process(target=gen, args=(tt.next(),))
job1.start()
job2.start()
job3.start()
job3.join()
for i in itst:
print i
tm = time.time() - tm
print tm
With the following output:
n
n
n
[INFO/Process-1] child process calling self.run()
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] child process calling self.run()
[INFO/Process-2] process shutting down
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
[INFO/Process-3] child process calling self.run()
[INFO/Process-3] process shutting down
[DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3] running the remaining "atexit" finalizers
[INFO/Process-3] process exiting with exitcode 0
0
1
2
9.01742887497
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
So we can see that the parallelization actually doesn't work at all.
However, when the time.sleep()
call is placed inside of the gen()
function, we see this:
import multiprocessing
import logging
m = [0,1,2,3]
iter_state = 0
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
print "n"
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
itst = multiprocessing.Array('i', 3)
def gen(t):
time.sleep(3)
itst[t] = t
multiprocessing.log_to_stderr(logging.DEBUG)
tm = time.time()
job1 = multiprocessing.Process(target=gen, args=(tt.next(),))
job2 = multiprocessing.Process(target=gen, args=(tt.next(),))
job3 = multiprocessing.Process(target=gen, args=(tt.next(),))
job1.start()
job2.start()
job3.start()
job3.join()
for i in itst:
print i
tm = time.time() - tm
print tm
n
n
n
[INFO/Process-1] child process calling self.run()
[INFO/Process-2] child process calling self.run()
[INFO/Process-3] child process calling self.run()
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-3] process shutting down
[DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3] running the remaining "atexit" finalizers
[INFO/Process-3] process exiting with exitcode 0
[INFO/Process-2] process shutting down
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
0
1
2
3.01985812187
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
Now we can see that parallelization works fine!
BUT unfortunately only for the gen()
function.
My question:
How can we achieve to the parallelization work inside of the next()
method too?
Upvotes: 1
Views: 1076
Reputation: 94891
Well, you could do it by only actually calling next()
inside of gen
. Right now, you're calling tt.next()
in the parent process, and passing the return value to the child. Instead, you should just pass the entire tt.next
method to the child. But note that by doing that, you need to make gener.c
a process-shared variable, and use a process-safe lock to protect the section of code where you increment it.
import multiprocessing
import logging
import time
class gener(object):
def __init__(self, m):
self.m = m
self.c = multiprocessing.Value('i', 0)
def __iter__(self):
return self
def next(self):
print "n"
time.sleep(3)
with self.c.get_lock():
ret = self.m[self.c.value]
self.c.value += 1
return ret
def gen(func):
t = func()
itst[t] = t
if __name__ == "__main__":
m = [0,1,2,3]
iter_state = 0
tt = gener(m)
itst = multiprocessing.Array('i', 3)
multiprocessing.log_to_stderr(logging.DEBUG)
tm = time.time()
jobs = [ multiprocessing.Process(target=gen, args=(tt.next,)) for _ in range(3)]
for j in jobs:
j.start()
for j in jobs:
j.join()
for i in itst:
print i
tm = time.time() - tm
print tm
Output:
[INFO/Process-3] child process calling self.run()
n
[INFO/Process-1] child process calling self.run()
[INFO/Process-2] child process calling self.run()
n
n
[INFO/Process-2] process shutting down
[INFO/Process-3] process shutting down
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3] running the remaining "atexit" finalizers
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-3] process exiting with exitcode 0
0
1
2
3.02282786369
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
Upvotes: 2