Reputation: 101
Using concurrent.futures.ProcessPoolExecutor I am trying to run the first piece of code to execute the function "Calculate_Forex_Data_Derivatives(data,gride_spacing)" in parallel. When calling the results, executor_list[i].result(), I get "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending." I have tried running the code sending multiple calls of the function to the processing pool as well as running the code only sending one call to the processing pool, both resulting in the error.
I have also tested the structure of the code with a simpler piece of code (2nd code provided) with the same types of input for the call function and it works fine. The only thing different that I can see between the two pieces of code is the first code calls the function "FinDiff(axis,grid_spacing,derivative_order)" from the 'findiff' module. This function along with the "Calculate_Forex_Data_Derivatives(data,gride_spacing)" work perfectly on there own when running normally in series.
I am using Anaconda environment, Spyder editor, and Windows.
Any help would be appreciated.
#code that returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."
import pandas as pd
import numpy as np
from findiff import FinDiff
import multiprocessing
import concurrent.futures
def Calculate_Forex_Data_Derivatives(forex_data,dt): #function to run in parallel
try:
dClose_dt = FinDiff(0,dt,1)(forex_data)[-1]
except IndexError:
dClose_dt = np.nan
try:
d2Close_dt2 = FinDiff(0,dt,2)(forex_data)[-1]
except IndexError:
d2Close_dt2 = np.nan
try:
d3Close_dt3 = FinDiff(0,dt,3)(forex_data)[-1]
except IndexError:
d3Close_dt3 = np.nan
return dClose_dt, d2Close_dt2, d3Close_dt3
#input for function
#forex_data is pandas dataframe, forex_data['Close'].values is numpy array
#dt is numpy array
#input_1 and input_2 are each a list of numpy arrays
input_1 = []
input_2 = []
for forex_data_index,data_point in enumerate(forex_data['Close'].values[:1]):
input_1.append(forex_data['Close'].values[:forex_data_index+1])
input_2.append(dt[:forex_data_index+1])
def multi_processing():
executors_list = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for index in range(len(input_1)):
executors_list.append(executor.submit(Calculate_Forex_Data_Derivatives,input_1[index],input_2[index]))
return executors_list
if __name__ == '__main__':
print('calculating derivatives')
executors_list = multi_processing()
for output in executors_list
print(output.result()) #returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."
##############################################################
#simple example that runs fine
def function(x,y): #function to run in parallel
try:
asdf
except NameError:
a = (x*y)[0]
b = (x+y)[0]
return a,b
x=[np.array([0,1,2]),np.array([3,4,5])] #function inputs, list of numpy arrays
y=[np.array([6,7,8]),np.array([9,10,11])]
def multi_processing():
executors_list = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for index,_ in enumerate(x):
executors_list.append(executor.submit(function,x[index],y[index]))
return executors_list
if __name__ == '__main__':
executors_list = multi_processing()
for output in executors_list: #prints as expected
print(output.result()) #(0, 6)
#(27, 12)
Upvotes: 9
Views: 32990
Reputation: 124
I found this in the official documents:
"The main module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter. Calling Executor or Future methods from a callable submitted to a ProcessPoolExecutor will result in deadlock."
Have you ever tried this? The following works for me:
if __name__ == '__main__':
executors_list = multi_processing()
for output in executors_list:
print(output.result())
Upvotes: 7
Reputation: 711
I know three typical ways to break the Pipe of a ProcessPoolExecutor:
Your system runs into limits, most likely memory, and starts killing processes. As a fork on windows clones your memory content, this is not unlikely when working with large DataFrames.
max_workers=1
, this is not unambiguous however.The Python instance of the subprocess terminates due to some error that does not raise a proper Exception. One example would be a segfault in an imported C-module.
As your code runs properly without the PPE, the only scenario I can think of is if some module is not multiprocessing-safe. It then also has a chance to disappear with max_workers=1
. It might also be possible to induce the Error in the main process by calling the function manually right after the workers are created (the line after the for-loop that calls executor.submit
.
Otherwise it could be really hard to identify, but in my opinion it is the most unlikely case.
The subprocess side of the pipe (i.e. code handling the communication) may crash, which results in a proper Exception, that unfortunately can not be communicated to the master process.
As the code is (hopefully) well tested, the prime suspect lies in the return data. It must be pickled and sent back via socket - both steps can crash. So you have to check:
So you can either try to return some simple dummy-data instead, or check the two conditions explicitely:
if len(pickle.dumps((dClose_dt, d2Close_dt2, d3Close_dt3))) > 2 * 10 ** 9:
raise RuntimeError('return data can not be sent!')
In Python 3.7, this problem is fixed, and it sends back the Exception.
Upvotes: 22