Jivan
Jivan

Reputation: 23068

How to share an instance variable between multiple processes

I have a class defined and used as follows, that includes a method called in the context of a multiprocessing pool:

from multiprocessing import Pool

class MyClass:
    def __init__(self, big_object):
        self.big_object = big_object

    def compute_one(self, myvar):
        return self.big_object.do_something_with(myvar)

    def compute_all(self, myvars):
        with Pool() as pool:
            results = pool.map(compute_one, myvars)
        return results


big_object = pd.read_csv('very_big_file.csv')
cls = MyClass(big_object)

vars = ['foo', 'bar', 'baz']
all_results = cls.compute_all(vars)

This "works" but the issue is that big_object takes several Gbs in RAM, and with the above code, this big object is copied in RAM for every process launched by multiprocessing.Pool.

How to modify the above code so that big_object would be shared among all processes, but still able to be defined at class instantiation?

-- EDIT

Some context into what I'm trying to do might help identify a different approach altogether.

Here, big_object is a 1M+ rows Pandas dataframe, with tens of columns. And compute_one computes statistics based on a particular column.

A simplified high-level view would be (extremely summarized):

The end result of this would look like:

manufacturer country_us country_gb client_male client_female
bmw 15 18 74 13
mercedes 2 24 12 17
jaguar 48 102 23 22

So in essence, this is about computing statistics for every column, on every other column, over the entire source dataframe.

Using multiprocessing here allows, for a given current_col, to compute all the statistics on remaining_cols in parallel. Among these statistics are not only sum but also mean (for remaining numerical columns).

A dirty approach using a global variable instead (a global big_object instantiated from outside the class), takes the entire running time from 5+ hours to about 20 minutes. My only issue is that I'd like to avoid this global object approach.

Upvotes: 0

Views: 672

Answers (2)

yer
yer

Reputation: 1

A minor addition to the previous reply.

The solution provided by @Booboo worked for me. However, depending on the size of the data, manager.shutdown() in the end of the "with...manager" block causes Permission error, for some reason.

The error seem to appear only with very large cls objects: when cls occupies 5-10% RAM there is no error, when it occupies 80% RAM (~50gb in my case) - the error appears.

I wasn't able to clearly define the reason for such behavior, but deleting cls before manager shutdown helped to avoid it:

if __name__ == '__main__':

MyClassManager.register('MyClass', MyClass)
with MyClassManager() as manager:
    cls = manager.MyClass('very_big_file.csv')

    my_vars = ['foo', 'bar', 'baz']
    with Pool() as pool:
        all_results = pool.map(cls.compute_one, my_vars)

    del cls  # <------- added line

print(all_results)

Below is the traceback of the Permission error:

Traceback (most recent call last):

File "D:\folder\main.py", line 73, 
        in <module> with MyClassManager() as manager:
File "C:\...\Python311\Lib\multiprocessing\managers.py", line 657, 
        in __exit__  self.shutdown()
File "C:\...\Python311\Lib\multiprocessing\util.py", line 224, 
        in __call__
res = self._callback(*self._args, **self._kwargs)
      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\Python311\Lib\multiprocessing\managers.py", line 681, 
        in _finalize_manager process.terminate()
File "C:\...\Python311\Lib\multiprocessing\process.py", line 133, 
        in terminate self._popen.terminate()
File "C:\...\Python311\Lib\multiprocessing\popen_spawn_win32.py", line 124, 
        in terminate _winapi.TerminateProcess(int(self._handle), TERMINATE)

PermissionError: [WinError 5] Permission denied

Process finished with exit code 1

Upvotes: 0

Booboo
Booboo

Reputation: 44043

One solution is to make MyClass a managed class just like, for example, a managed dictionary created with multiprocessor.Manager().dict(). To ensure that there is only one copy of the "big object", first I would modify MyClass.__init__ to take a CSV file path argument. In this way the "big object" is constructed only in the process of the manager. Second, I would remove the compute_all logic from MyClass and invoke the multiprocessing.pool.Pool.map method in such as way that what is being passed as the worker function is the managed objects's proxy.

What you save in space, you give up in some performance since each invocation of method compute_one results in more-or-less the equivalent of a remote method call to the manager's process where it is actually executed.

from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import pandas as pd

class MyClassManager(BaseManager):
    pass

class MyClass:
    def __init__(self, path):
        self.big_object = pd.read_csv(path)

    def compute_one(self, myvar):
        # For demo purposes just check if column myvar exists in dataframe:
        return myvar in self.big_object

# Required for Windows:
if __name__ == '__main__':
    MyClassManager.register('MyClass', MyClass)
    with MyClassManager() as manager:
        cls = manager.MyClass('very_big_file.csv')

        # vars is a built-in function name and should not be used:
        my_vars = ['foo', 'bar', 'baz']
        with Pool() as pool:
            all_results = pool.map(cls.compute_one, my_vars)
        print(all_results)

Upvotes: 1

Related Questions