Reputation: 1
I'm trying to parallelize a process with ProcessPoolExecutor on python 3.12. To move a large dataset around I've created a numpy array of objects that I am trying to share with each process. The code works as expected on POSIX, but on windows on the same machine, when attempting to access the shared memory buffer in the subprocess, the process breaks and results in a BrokenProcessError. I've tested and am sure the system is not running out of memory and causing the issue. I recognize that I could split the objects up into two arrays of x and r values, but I'm trying to make something that can be easily extended to more complex multiprocessing tasks in the futures. Thank you in advance.
from dataclasses import dataclass
import time
import concurrent
from fluids.gas import Gas
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import fsolve
import multiprocessing as mp
from multiprocessing import shared_memory
from multiprocessing import Queue
import concurrent.futures
import joblib
from icecream import ic
from Cooling.material import DomainMaterial
from Cooling import material
from Nozzle import plots
from General.units import Q_, unitReg
from fluids import gas
mcQ = Queue()
SHAREDMEMNAME = 'CoolingDomain'
@dataclass
class CoolingChannel:
upperContour: np.ndarray
lowerContour: np.ndarray
@dataclass
class DomainPoint:
x: float
r: float
area: float
material: DomainMaterial = DomainMaterial.FREE
border: bool = False
temperature: Q_ = Q_(70, unitReg.degF)
pressure: Q_ = Q_(12.1, unitReg.psi)
velocity: Q_ = Q_(0, unitReg.ft/unitReg.s)
hydraulicDiameter: Q_ = Q_(0, unitReg.inch)
previousFlow: tuple[int, int] = (0,0)
flowHeight: Q_ = Q_(0, unitReg.inch)
class DomainMC:
array: np.ndarray
x0: float
r0: float
width: float
height: float
xstep: float
rstep: float
hpoints: int
vpoints: int
def __init__(self, x0, r0, width, height, ds = .1):
hpoints = int(width/ds) + 1
vpoints = int(height/ds) + 1
self.array = np.empty((vpoints, hpoints), dtype=DomainPoint)
self.x0 = x0
self.r0 = r0
self.width = width
self.height = height
self.hpoints = hpoints
self.vpoints = vpoints
self.xstep = width/(hpoints-1)
self.rstep = height/(vpoints-1)
for i in range(vpoints):
for j in range(hpoints):
self.array[i,j] = DomainPoint(x0 + j*self.xstep, r0 - i*self.rstep, self.xstep*self.rstep)
def DefineMaterials(self, cowl: np.ndarray, coolant: np.ndarray, chamber: np.ndarray, plug: np.ndarray, max_cores = mp.cpu_count() - 1):
MAX_CORES = max_cores
shm = None
tic = time.perf_counter()
try:
shm = shared_memory.SharedMemory(create=True, size=self.array.nbytes, name=SHAREDMEMNAME)
except FileExistsError: # clean up from a previous crash
shm = shared_memory.SharedMemory(name=SHAREDMEMNAME)
shm.unlink()
shm = shared_memory.SharedMemory(create=True, size=self.array.nbytes, name=SHAREDMEMNAME)
newarray = np.ndarray(self.array.shape, dtype=DomainPoint, buffer=shm.buf)
newarray[:] = self.array[:]
self.array = newarray.copy()
# use a pool of processes to parallelize the computation
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_CORES, initializer=init_pool_processes, initargs=(mcQ,)) as executor:
for i in range(self.vpoints):
for j in range(self.hpoints):
# q.append((i, j))
mcQ.put((i, j))
print(mcQ.qsize())
futures = []
print(f"Starting parallel computation with {MAX_CORES} cores")
for i in range(MAX_CORES):
futures.append(executor.submit(EvalMaterialProcess, i, SHAREDMEMNAME, self.array.shape, (self.width, self.height), coolant, cowl, chamber, plug))
print("Tasks submitted")
for future in concurrent.futures.as_completed(futures):
res = future.result(timeout=None)
for i, j, mat in res:
self.array[i,j].material = mat
print("Parallel computation done")
shm.close()
shm.unlink()
toc = time.perf_counter()
print("material defined")
print(f"Time to define materials: {toc - tic}")
def EvalMaterialProcess(pn, shmName, shape, size, coolant, cowl, chamber, plug):
res = []
global mcQ
shm = shared_memory.SharedMemory(name=shmName)
domain = np.ndarray(shape, dtype=DomainPoint, buffer=shm.buf)
print(f"Starting process {pn + 1}", flush=True)
prevPercent = 0
total = np.prod(shape)
while mcQ.qsize() > 0:
i, j = mcQ.get()
res.append(AssignMaterial(domain, i, j, size, coolant, cowl, chamber, plug))
if pn == 0:
curPercent = int((total - mcQ.qsize())/total * 100)
if prevPercent < curPercent:
prevPercent = curPercent
print(f"Progress: {prevPercent}%")
print("done")
shm.close()
return res
def AssignMaterial(domain, i, j, size, coolant, cowl, chamber, plug):
if material.isIntersect(domain[i][j], coolant, size):
return (i, j, DomainMaterial.COOLANT)
if material.isIntersect(domain[i][j], cowl, size):
return (i, j, DomainMaterial.COWL)
if material.isIntersect(domain[i][j], chamber, size):
return (i, j, DomainMaterial.CHAMBER)
if material.isIntersect(domain[i][j], plug, size):
return (i, j, DomainMaterial.PLUG)
return (i, j, DomainMaterial.FREE)
def init_pool_processes(q):
global mcQ
mcQ = q
Upvotes: 0
Views: 48