Reputation: 23068
I have a class defined and used as follows, that includes a method called in the context of a multiprocessing pool:
from multiprocessing import Pool
class MyClass:
def __init__(self, big_object):
self.big_object = big_object
def compute_one(self, myvar):
return self.big_object.do_something_with(myvar)
def compute_all(self, myvars):
with Pool() as pool:
results = pool.map(compute_one, myvars)
return results
big_object = pd.read_csv('very_big_file.csv')
cls = MyClass(big_object)
vars = ['foo', 'bar', 'baz']
all_results = cls.compute_all(vars)
This "works" but the issue is that big_object
takes several Gbs in RAM, and with the above code, this big object is copied in RAM for every process launched by multiprocessing.Pool
.
How to modify the above code so that big_object
would be shared among all processes, but still able to be defined at class instantiation?
-- EDIT
Some context into what I'm trying to do might help identify a different approach altogether.
Here, big_object
is a 1M+ rows Pandas dataframe, with tens of columns. And compute_one
computes statistics based on a particular column.
A simplified high-level view would be (extremely summarized):
current_col = manufacturer
)rem_col
of the remaining categorical columns:
big_object.groupby([current_col, rem_col]).size()
The end result of this would look like:
manufacturer | country_us | country_gb | client_male | client_female |
---|---|---|---|---|
bmw | 15 | 18 | 74 | 13 |
mercedes | 2 | 24 | 12 | 17 |
jaguar | 48 | 102 | 23 | 22 |
So in essence, this is about computing statistics for every column, on every other column, over the entire source dataframe.
Using multiprocessing here allows, for a given current_col
, to compute all the statistics on remaining_cols
in parallel. Among these statistics are not only sum
but also mean
(for remaining numerical columns).
A dirty approach using a global variable instead (a global big_object
instantiated from outside the class), takes the entire running time from 5+ hours to about 20 minutes. My only issue is that I'd like to avoid this global object approach.
Upvotes: 0
Views: 672
Reputation: 1
A minor addition to the previous reply.
The solution provided by @Booboo worked for me. However, depending on the size of the data, manager.shutdown() in the end of the "with...manager" block causes Permission error, for some reason.
The error seem to appear only with very large cls objects: when cls occupies 5-10% RAM there is no error, when it occupies 80% RAM (~50gb in my case) - the error appears.
I wasn't able to clearly define the reason for such behavior, but deleting cls before manager shutdown helped to avoid it:
if __name__ == '__main__':
MyClassManager.register('MyClass', MyClass)
with MyClassManager() as manager:
cls = manager.MyClass('very_big_file.csv')
my_vars = ['foo', 'bar', 'baz']
with Pool() as pool:
all_results = pool.map(cls.compute_one, my_vars)
del cls # <------- added line
print(all_results)
Below is the traceback of the Permission error:
Traceback (most recent call last):
File "D:\folder\main.py", line 73,
in <module> with MyClassManager() as manager:
File "C:\...\Python311\Lib\multiprocessing\managers.py", line 657,
in __exit__ self.shutdown()
File "C:\...\Python311\Lib\multiprocessing\util.py", line 224,
in __call__
res = self._callback(*self._args, **self._kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\Python311\Lib\multiprocessing\managers.py", line 681,
in _finalize_manager process.terminate()
File "C:\...\Python311\Lib\multiprocessing\process.py", line 133,
in terminate self._popen.terminate()
File "C:\...\Python311\Lib\multiprocessing\popen_spawn_win32.py", line 124,
in terminate _winapi.TerminateProcess(int(self._handle), TERMINATE)
PermissionError: [WinError 5] Permission denied
Process finished with exit code 1
Upvotes: 0
Reputation: 44043
One solution is to make MyClass
a managed class just like, for example, a managed dictionary created with multiprocessor.Manager().dict()
. To ensure that there is only one copy of the "big object", first I would modify MyClass.__init__
to take a CSV file path argument. In this way the "big object" is constructed only in the process of the manager. Second, I would remove the compute_all
logic from MyClass
and invoke the multiprocessing.pool.Pool.map
method in such as way that what is being passed as the worker function is the managed objects's proxy.
What you save in space, you give up in some performance since each invocation of method compute_one
results in more-or-less the equivalent of a remote method call to the manager's process where it is actually executed.
from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import pandas as pd
class MyClassManager(BaseManager):
pass
class MyClass:
def __init__(self, path):
self.big_object = pd.read_csv(path)
def compute_one(self, myvar):
# For demo purposes just check if column myvar exists in dataframe:
return myvar in self.big_object
# Required for Windows:
if __name__ == '__main__':
MyClassManager.register('MyClass', MyClass)
with MyClassManager() as manager:
cls = manager.MyClass('very_big_file.csv')
# vars is a built-in function name and should not be used:
my_vars = ['foo', 'bar', 'baz']
with Pool() as pool:
all_results = pool.map(cls.compute_one, my_vars)
print(all_results)
Upvotes: 1