Reputation: 937
Testing Environment:
I write a simple program to test process-safe. I find that subprocess2
won't run until subprocess1
finished. It seems that the instance variable self.count
is process-safe.How the process share this variable? Does they share self
directly?
Another question is when I use Queue, I have to use multiprocessing.Manager
to guarantees process safety manually, or the program won't run as expected.(If you uncomment self.queue = multiprocessing.Queue()
, this program won't run normally, but using self.queue = multiprocessing.Manager().Queue()
is OK.)
The last question is why the final result is 900
? I think it should be 102
.
Sorry for asking so many questions, but I'm indeed curious about these things. Thanks a lot!
Code:
import multiprocessing
import time
class Test:
def __init__(self):
self.pool = multiprocessing.Pool(1)
self.count = 0
#self.queue = multiprocessing.Queue()
#self.queue = multiprocessing.Manager().Queue()
def subprocess1(self):
for i in range(3):
print("Subprocess 1, count = %d" %self.count)
self.count += 1
time.sleep(1)
print("Subprocess 1 Completed")
def subprocess2(self):
self.count = 100
for i in range(3):
print("Subprocess 2, count = %d" %self.count)
self.count += 1
time.sleep(1)
print("Subprocess 2 Completed")
def start(self):
self.pool.apply_async(func=self.subprocess1)
print("Subprocess 1 has been started")
self.count = 900
self.pool.apply_async(func=self.subprocess2)
print("Subprocess 2 has been started")
self.pool.close()
self.pool.join()
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
if __name__ == '__main__':
test = Test()
test.start()
print("Final Result, count = %d" %test.count)
Output:
Subprocess 1 has been started
Subprocess 2 has been started
Subprocess 1, count = 0
Subprocess 1, count = 1
Subprocess 1, count = 2
Subprocess 1 Completed
Subprocess 2, count = 100
Subprocess 2, count = 101
Subprocess 2, count = 102
Subprocess 2 Completed
Final Result, count = 900
Upvotes: 0
Views: 748
Reputation: 15513
Question: ... why the final result is 900? I think it should be 102.
The result should be 106, range
are 0
based, you get 3 iterations.
You can get the expected output, for instance:
class PoolTasks(object):
def __init__(self):
self.count = None
def task(self, n, start):
import os
pid = os.getpid()
count = start
print("Task %s in Process %s has been started - start=%s" % (n, pid, count))
for i in range(3):
print("Task %s in Process %s, count = %d " % (n, pid, count))
count += 1
time.sleep(1)
print("Task %s in Process %s has been completed - count=%s" % (n, pid, count))
return count
def start(self):
with mp.Pool(processes=4) as pool:
# launching multiple tasks asynchronously using processes
multiple_results = [pool.apply_async(self.task, (p)) for p in [(1, 0), (2, 100)]]
# sum result from tasks
self.count = 0
for res in multiple_results:
self.count += res.get()
if __name__ == '__main__':
pool = PoolTasks()
pool.start()
print('sum(count) = %s' % pool.count)
Output:
Task 1 in Process 5601 has been started - start=0
Task 1 in Process 5601, count = 0
Task 2 in Process 5602 has been started - start=100
Task 2 in Process 5602, count = 100
Task 1 in Process 5601, count = 1
Task 2 in Process 5602, count = 101
Task 1 in Process 5601, count = 2
Task 2 in Process 5602, count = 102
Task 1 in Process 5601 has been completed - count=3
Task 2 in Process 5602 has been completed - count=103
sum(count) = 106
Tested with Python:3.4.2
Upvotes: 2
Reputation: 488143
The underlying details are rather tricky (see the Python3 documentation for more, and note that the details are slightly different for Python2), but essentially, when you pass self.subprocess1
or self.subprocess2
as an argument to self.pool.apply_async
, Python ends up calling:
pickle.dumps(self)
in the main process—the initial one on Linux before fork
ing, or the one invoked as __main__
on Windows—and then, eventually, pickle.loads()
of the resulting byte-string in the pool process.1 The pickle.dumps
code winds up calling your own __getstate__
function; that function's job is to return something that can be serialized to a byte-string.2 The subsequent pickle.loads
creates a blank instance of the appropriate type, does not call its __init__
, and then uses its __setstate__
function to fill in the object (instead of __init__
ing it).
Your __getstate__
returns the dictionary holding the state of self
, minus the pool
object, for good reason:
>>> import multiprocessing
>>> x = multiprocessing.Pool(1)
>>> import pickle
>>> pickle.dumps(x)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.5/multiprocessing/pool.py", line 492, in __reduce__
'pool objects cannot be passed between processes or pickled'
NotImplementedError: pool objects cannot be passed between processes or pickled
Since pool objects refuse to be pickled (serialized), we must avoid even attempting to do that.
In any case, all of this means that the pool process has its own copy of self
, which has its own copy of self.count
(and is missing self.pool
entirely). These items are not shared in any way so it is safe to modify self.count
there.
I find the simplest mental model of this is to give each worker process a name: Alice, Bob, Carol, and so on, if you like. You can then think of the main process as "you": you copy something and give the copy to Alice, then copy it and give that one to Bob, and so on. Function calls, such as apply
or apply_async
, copy all of their arguments—including the implied self
for bound methods.
When using a multiprocessing.Queue
, you get something that knows how to work between the various processes, sharing data as needed, with appropriate synchronization. This lets you pass copies of data back and forth. However, like a pool
instance, a multiprocessing.Queue
instance cannot be copied. The multiprocessing
routines do let you copy a multiprocessing.Manager().Queue()
instance, which is good if you want a copied and otherwise private Queue()
instance. (The internal details of this are complicated.3)
The final result you get is just 900
because you are looking only at the original self
object.
Note that each applied functions (from apply
or apply_async
) returns a result. This result is copied back, from the worker process to the main process. With apply_async
, you may choose to get called back as soon as the result is ready. If you want this result you should save it somewhere, or use the get
function (as shown in that same answer) to wait for it when you need it.
1We can say "the" pool process here without worrying about which one, as you limited yourself to just one. In any case, though, there is a simple byte-oriented, two-way communications stream, managed by the multiprocessing
code, connecting each worker process with the parent process that invoked it. If you create two such pool processes, each one has its own byte-stream connecting to the main process. This means it would not matter if there were two or more: the behavior would be the same.
2This "something" is often a dictionary, but see Simple example of use of __setstate__ and __getstate__ for details.
3The output of pickle.dumps
on such an instance is:
>>> pickle.dumps(y)
(b'\x80\x03cmultiprocessing.managers\n'
b'RebuildProxy\n'
b'q\x00(cmultiprocessing.managers\n'
b'AutoProxy\n'
b'q\x01cmultiprocessing.managers\n'
b'Token\n'
b'q\x02)\x81q\x03X\x05\x00\x00\x00Queueq\x04X$\x00\x00\x00/tmp/pymp-pog4bhub/listener-0_uwd8c9q\x05X\t\x00\x00\x00801b92400q\x06\x87q\x07bX\x06\x00\x00\x00pickleq\x08}q\tX\x07\x00\x00\x00exposedq\n'
b'(X\x05\x00\x00\x00emptyq\x0bX\x04\x00\x00\x00fullq\x0cX\x03\x00\x00\x00getq\rX\n'
b'\x00\x00\x00get_nowaitq\x0eX\x04\x00\x00\x00joinq\x0fX\x03\x00\x00\x00putq\x10X\n'
b'\x00\x00\x00put_nowaitq\x11X\x05\x00\x00\x00qsizeq\x12X\t\x00\x00\x00task_doneq\x13tq\x14stq\x15Rq\x16.\n')
I did a little trickiness to split this at newlines and then manually added the parentheses, just to keep the long line from being super-long. The arguments will vary on different systems; this particular one uses a file system object that is a listener socket, that allows cooperating Python processes to establish a new byte stream between themselves.
Upvotes: 2