M B
M B

Reputation: 13

Performance and Memory duplication when using multiprocessing with a multiple arguments function in Python?

I am having difficulties understanding logic of optimizing multip-rocessing in Python 3.11.

Context:

I am running on Ubuntu 22.04, x86 12 cores / 32 threads, 128 GB memory

Concerns:

(Please refer to code and result below).
Both multiprocessing function using a local df (using map+partial or starmap) take a lot more time than multiprocessing using the global df.

It seemed ok to me but ...

... by activating the sleep(10) in both fun, I noticed that every spawned process takes around 1.2 GB of memory (using system monitor in Ubuntu), so I guess the df must have been duplicated even when declared global.

My question (eventually :-))

Thank you for your enlightenment

Code

from time import sleep
from multiprocessing import Pool
from functools import partial
from time import time

import numpy as np
import pandas as pd


def _fun_local(df, imax: int):
    # sleep(10)
    df_temp = df[:imax]
    return df_temp


def _fun_global(imax: int):
    global mydf
    # sleep(10)
    df_temp = mydf[:imax]
    return df_temp


def dummy():
    # Create a 1 GB df
    global mydf
    n_rows = 13_421_773
    n_cols = 10
    data = np.random.rand(n_rows, n_cols)
    mydf = pd.DataFrame(data, columns=[f'col_{i}' for i in range(n_cols)])
    # check memory footprint
    print('mydf', mydf.memory_usage(deep=True).sum() / 1024 / 1024)

    imaxs = range(1, 33)

    # With local df, call function with partial
    start = time()
    with Pool() as pool:
        fun = partial(_fun_local, mydf)
        results_partial = pool.map(fun, imaxs)
    print(f'local-partial took: {time()-start}')

    # With local df, call function with starmap
    start = time()
    with Pool() as pool:
        results_starmap = pool.starmap(_fun_local, [(mydf, imax) for imax in imaxs])
    print(f'local-starmap took: {time()-start}')

    # With global mydf
    start = time()
    with Pool() as pool:
        results_global = pool.map(_fun_global, imaxs)
    print(f'global took: {time()-start}')

    return results_local, results_global

if __name__ == '__main__':
    results = dummy()

Result:

mydf (MB):  1024.0001411437988
local-partial took: 89.05407881736755
local-starmap took: 88.06274890899658
global took: 0.09803605079650879

Upvotes: 1

Views: 45

Answers (1)

Booboo
Booboo

Reputation: 44283

When you fork a child process (in this case you are forking multiprocessing.count() processes) it is true that the child process inherits the memory of the forking process. But copy-on-write semantics is used such that when that inherited memory is modified, it is first copied resulting in increased memory utilization. Even though the child process is not explicitly updating the global dataframe, once it is referenced it gets copied because Python is using reference counts for memory management and the reference count for the dataframe will get incremented. Now consider the following code.

from multiprocessing import Pool
import os
import time

SLEEP = False

some_data = [0, 1, 2, 3, 4, 5, 6, 7]

def worker(i):
    if SLEEP:
        time.sleep(.5)

    return some_data[i], os.getpid()

def main():
    with Pool(8) as pool:
        results = pool.map(worker, range(8))
        print('results =', results)

if __name__ == '__main__':
    main()

Prints:

results = [(0, 35988), (1, 35988), (2, 35988), (3, 35988), (4, 35988), (5, 35988), (6, 35988), (7, 35988)]

We see that it a single pool process (PID=35988) processed all 8 submitted tasks. This is because the task was incredibly short running resulting in a single pool process being able to pull from the pool's task queue all 8 tasks before the other pool processes finished starting and attempted to process tasks. This also means that global some_data was only referenced by a single pool process and therefore only copied once.

If now we change SLEEP to True, the output is now:

results = [(0, 45324), (1, 4828), (2, 19760), (3, 8420), (4, 41944), (5, 58220), (6, 46340), (7, 21628)]

Each of the 8 pool processes processed one task and therefore some_data was copied 8 times.

What Does This Mean?

Your _fun_global worker function is also short running if it is not sleeping and probably only a single pool process is processing all submitted tasks resulting in the dataframe being copied only once. But if you do sleep, then each pool process will get to process a task and the dataframe will be copied N times where N is the size of the pool (os.cpu_count()).

But even when the inherited memory must be copied, this is a relatively fast operation compared to the situation where you are passing a local dataframe as an argument, in which case the copying is done by using pickle to first serialize the dataframe in the main process and pickle again to de-serialize the dataframe in the child process.

Summary

  1. Using a local dataframe is is slower because copying a dataframe using pickle is slower than just doing a byte-for-byte copy.
  2. Using a global dataframe will result in lower memory utilization if your worker function is short-running so that a single pool process handles all submitted tasks resulting in the copy-on-write occurring only once.

Use Shared Memory To Reduce Memory Utilization

You can reduce memory by sharing a single copy of the dataframe among the processes:

  1. Construct your data numpy array.
  2. Copy a flattened version of the data array into a shared memory array shared_array.
  3. Construct a dataframe mydf based on the shared array.
  4. The processing pool is created specifying an initializer function that is executed once for each pool process. The initializer is passed the shared array from which it can reconstruct the sharable datframe.
from time import sleep
import multiprocessing
import numpy as np
import pandas as pd

def np_array_to_shared_array(np_array, lock=False):
    """Construct a sharable array from a numpy array.
    Specify lock=True if multiple processes might be updating the same
    array element."""

    shared_array = multiprocessing.Array('B', np_array.nbytes, lock=lock)
    buffer = shared_array.get_obj() if lock else shared_array
    arr = np.frombuffer(buffer, np_array.dtype)
    arr[:] = np_array.flatten(order='C')
    return shared_array

def shared_array_to_np_array(shared_array, shape, dtype):
    """Reconstruct a numpy array from a shared array."""

    buffer = (
        shared_array.get_obj() if getattr(shared_array, 'get_obj', None)
        else shared_array
    )
    return np.ndarray(shape, dtype=dtype, buffer=buffer)

def df_from_shared_array(shared_array: multiprocessing.Array, shape: tuple[int], dtype: str):
    """Construct the dataframe based on a sharable array."""

    np_array = shared_array_to_np_array(shared_array, shape, dtype)
    return pd.DataFrame(np_array, columns=[f'col_{i}' for i in range(shape[1])])

def init_pool(shared_array: multiprocessing.Array, shape: tuple[int], dtype: str):
    """This is executed for each pool process and for each creates a global variable
    mydf."""
    
    global mydf

    mydf = df_from_shared_array(shared_array, shape, dtype)

def _fun(imax: int):
    sleep(1)
    return mydf[:imax]

def make_data():
    n_rows = 13_421_773
    n_cols = 10
    data = np.random.rand(n_rows, n_cols)

    shared_array = np_array_to_shared_array(data)
    mydf = df_from_shared_array(shared_array, data.shape, data.dtype)
    shape, dtype = data.shape, data.dtype
    del data  # We no longer need this
    return shared_array, shape, dtype, mydf

def dummy():
    shared_array, shape, dtype, mydf = make_data()

    imaxs = range(1, shape[1] + 1)

    with multiprocessing.Pool(
        shape[1],
        initializer=init_pool,
        initargs=(shared_array, shape, dtype)
    ) as pool:
        results = pool.map(_fun, imaxs)

    for result in results:
        print(result, end='\n\n')

if __name__ == '__main__':
    dummy()

Upvotes: 1

Related Questions