David Anderson
David Anderson

Reputation: 1

Shared memory with ProcessPoolExecutor

I'm trying to parallelize a process with ProcessPoolExecutor on python 3.12. To move a large dataset around I've created a numpy array of objects that I am trying to share with each process. The code works as expected on POSIX, but on windows on the same machine, when attempting to access the shared memory buffer in the subprocess, the process breaks and results in a BrokenProcessError. I've tested and am sure the system is not running out of memory and causing the issue. I recognize that I could split the objects up into two arrays of x and r values, but I'm trying to make something that can be easily extended to more complex multiprocessing tasks in the futures. Thank you in advance.

from dataclasses import dataclass
import time
import concurrent
from fluids.gas import Gas
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import fsolve

import multiprocessing as mp
from multiprocessing import shared_memory
from multiprocessing import Queue
import concurrent.futures
import joblib

from icecream import ic

from Cooling.material import DomainMaterial
from Cooling import material
from Nozzle import plots
from General.units import Q_, unitReg
from fluids import gas

mcQ = Queue()
SHAREDMEMNAME = 'CoolingDomain'

@dataclass
class CoolingChannel:
    upperContour: np.ndarray
    lowerContour: np.ndarray    

@dataclass
class DomainPoint:
    x: float
    r: float
    area: float
    material: DomainMaterial = DomainMaterial.FREE
    border: bool = False
    temperature: Q_ = Q_(70, unitReg.degF)
    pressure: Q_ = Q_(12.1, unitReg.psi)
    velocity: Q_ = Q_(0, unitReg.ft/unitReg.s)
    hydraulicDiameter: Q_ = Q_(0, unitReg.inch)
    previousFlow: tuple[int, int] = (0,0)
    flowHeight: Q_ = Q_(0, unitReg.inch)

class DomainMC:
    array: np.ndarray
    x0: float
    r0: float
    width: float
    height: float
    xstep: float
    rstep: float
    hpoints: int
    vpoints: int

    def __init__(self, x0, r0, width, height, ds = .1):        
        hpoints = int(width/ds) + 1
        vpoints = int(height/ds) + 1
        
        self.array = np.empty((vpoints, hpoints), dtype=DomainPoint)

        self.x0 = x0
        self.r0 = r0
        self.width = width
        self.height = height
        self.hpoints = hpoints
        self.vpoints = vpoints
        self.xstep = width/(hpoints-1)
        self.rstep = height/(vpoints-1)
        for i in range(vpoints):
            for j in range(hpoints):
                self.array[i,j] = DomainPoint(x0 + j*self.xstep, r0 - i*self.rstep, self.xstep*self.rstep)

    def DefineMaterials(self, cowl: np.ndarray, coolant: np.ndarray, chamber: np.ndarray, plug: np.ndarray, max_cores = mp.cpu_count() - 1):
        MAX_CORES = max_cores
        shm = None
        tic = time.perf_counter()
        try:
            shm = shared_memory.SharedMemory(create=True, size=self.array.nbytes, name=SHAREDMEMNAME)
        except FileExistsError: # clean up from a previous crash
            shm = shared_memory.SharedMemory(name=SHAREDMEMNAME)
            shm.unlink()
            shm = shared_memory.SharedMemory(create=True, size=self.array.nbytes, name=SHAREDMEMNAME)
        
        newarray = np.ndarray(self.array.shape, dtype=DomainPoint, buffer=shm.buf)
        newarray[:] = self.array[:]

        self.array = newarray.copy()

        # use a pool of processes to parallelize the computation
        with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_CORES, initializer=init_pool_processes, initargs=(mcQ,)) as executor:

            for i in range(self.vpoints):
                for j in range(self.hpoints):
                    # q.append((i, j))
                    mcQ.put((i, j))

            print(mcQ.qsize())

            futures = []
            print(f"Starting parallel computation with {MAX_CORES} cores")
            for i in range(MAX_CORES):
                futures.append(executor.submit(EvalMaterialProcess, i, SHAREDMEMNAME, self.array.shape, (self.width, self.height), coolant, cowl, chamber, plug))
            print("Tasks submitted")
            for future in concurrent.futures.as_completed(futures):
                res = future.result(timeout=None)
                for i, j, mat in res:
                    self.array[i,j].material = mat

        print("Parallel computation done")
        shm.close()
        shm.unlink()
        toc = time.perf_counter()
        print("material defined")
        print(f"Time to define materials: {toc - tic}")

def EvalMaterialProcess(pn, shmName, shape, size, coolant, cowl, chamber, plug):
    res = []
    global mcQ
    shm = shared_memory.SharedMemory(name=shmName)
    domain = np.ndarray(shape, dtype=DomainPoint, buffer=shm.buf)
    print(f"Starting process {pn + 1}", flush=True)
    prevPercent = 0
    total = np.prod(shape)
    while mcQ.qsize() > 0:
        i, j = mcQ.get()
        res.append(AssignMaterial(domain, i, j, size, coolant, cowl, chamber, plug))
        if pn == 0:
            curPercent = int((total - mcQ.qsize())/total * 100)
            if prevPercent < curPercent:
                prevPercent = curPercent
                print(f"Progress: {prevPercent}%")
    print("done")
    shm.close()
    return res

def AssignMaterial(domain, i, j, size, coolant, cowl, chamber, plug):
    if material.isIntersect(domain[i][j], coolant, size):
        return (i, j, DomainMaterial.COOLANT)
    if material.isIntersect(domain[i][j], cowl, size):
        return (i, j, DomainMaterial.COWL)
    if material.isIntersect(domain[i][j], chamber, size):
        return (i, j, DomainMaterial.CHAMBER)
    if material.isIntersect(domain[i][j], plug, size):
        return (i, j, DomainMaterial.PLUG)
    return (i, j, DomainMaterial.FREE)

def init_pool_processes(q):
    global mcQ
    mcQ = q

Upvotes: 0

Views: 48

Answers (0)

Related Questions