Reputation: 22979
Consider the following snippet:
import numpy as np
import multiprocessing as mp
import time
def work_standalone(args):
return 2
class Worker:
def __init__(self):
self.data = np.random.random(size=(10000, 10000))
# leave a trace whenever init is called
with open('rnd-%d' % np.random.randint(100), 'a') as f:
f.write('init called\n')
def work_internal(self, args):
return 2
def _run(self, target):
with mp.Pool() as pool:
tasks = [[idx] for idx in range(16)]
result = pool.imap(target, tasks)
for res in result:
pass
def run_internal(self):
self._run(self.work_internal)
def run_standalone(self):
self._run(work_standalone)
if __name__ == '__main__':
t1 = time.time()
Worker().run_standalone()
t2 = time.time()
print(f'Standalone took {t2 - t1:.3f} seconds')
t3 = time.time()
Worker().run_internal()
t4 = time.time()
print(f'Internal took {t3 - t4:.3f} seconds')
I.e. we have an object containing a large variable that uses multiprocessing to parallelize some work that has nothing to do with that large variable, i.e. does not read from or write to. The location of the worker process has a huge impact on the runtime:
Standalone took 0.616 seconds
Internal took 19.917 seconds
Why is this happening? I am completely lost. Note that __init__
is only called twice, so the random data is not created for every new process in the pool. The only reason I can think of why this would be slow is that data is copied around, but that would not make sense since it is never used anywhere, and python is supposed to use copy-on-write semantics. Also note that the difference disappears if you make run_internal
a static method.
Upvotes: 2
Views: 35
Reputation: 1792
The issue you have is due to the target
you are calling from the pool. That target
is the function with the reference to Worker
instance.
Now, you're right that the __init__()
is only called twice. But remember, when you send anything to and from the processes, python will need to pickle the data first.
So, because your target
is self.work_internal()
, python has to pickle the Worker()
instance every time the imap is called. This leads to one issue, self.data
being copied over again and again.
The following is the proof. I just added 1 "input" statements, and fixed the last time of time calculation.
import numpy as np
import multiprocessing as mp
import time
def work_standalone(args):
return 2
class Worker:
def __init__(self):
self.data = np.random.random(size=(10000, 10000))
# leave a trace whenever init is called
with open('rnd-%d' % np.random.randint(100), 'a') as f:
f.write('init called\n')
def work_internal(self, args):
return 2
def _run(self, target):
with mp.Pool() as pool:
tasks = [[idx] for idx in range(16)]
result = pool.imap(target, tasks)
input("Wait for analysis")
for res in result:
pass
def run_internal(self):
self._run(self.work_internal)
# self._run(work_standalone)
def run_standalone(self):
self._run(work_standalone)
def work_internal(target):
with mp.Pool() as pool:
tasks = [[idx] for idx in range(16)]
result = pool.imap(target, tasks)
for res in result:
pass
if __name__ == '__main__':
t1 = time.time()
Worker().run_standalone()
t2 = time.time()
print(f'Standalone took {t2 - t1:.3f} seconds')
t3 = time.time()
Worker().run_internal()
t4 = time.time()
print(f'Internal took {t4 - t3:.3f} seconds')
You can run the code, when it shows up "wait for analysis", go and check the memory usage.
Like so
Then on the second time you see the message, press enter. And observe the memory usage increasing and decreasing again.
On the other hand, if you change self._run(self.work_internal)
to self._run(work_standalone)
you would notice that the speed is very fast, and the memory is not increasing, as well as the time taken is a lot shorter than doing self.work_internal
.
One way to solve your issue is to set self.data
as a static class variable. In normal cases, this would prevent instances from having to copy/reinit the variable again. This also prevented the issue from occuring.
class Worker:
data = np.random.random(size=(10000, 10000))
def __init__(self):
pass
...
Upvotes: 3