Reputation: 30256
This is basically a duplicate of: Keras + Tensorflow and Multiprocessing in Python But my setup is a bit different, and their solution doesn't work for me.
I need to train a keras model against predictions made from another model. The predictions are connected to some CPU heavy code, so I would like to parallelize them and have the code run in worker processes. Here is the code I would like to execute:
import numpy as np
from keras.layers import Input, Dense
from keras.models import Model
from keras.optimizers import Adam
def create_model():
input_layer = Input((10,))
dense = Dense(10)(input_layer)
return Model(inputs=input_layer, outputs=dense)
model_outside = create_model()
model_outside.compile(Adam(1e-3), "mse")
def subprocess_routine(weights):
model_inside = create_model()
model_inside.set_weights(weights)
while True:
# lots of CPU
batch = np.random.rand(10, 10)
prediction = model_inside.predict(batch)
yield batch, prediction
weights = model_outside.get_weights()
model_outside.fit_generator(subprocess_routine(weights),
epochs=10,
steps_per_epoch=100,
use_multiprocessing=True,
workers=1)
This produces an error
E tensorflow/core/grappler/clusters/utils.cc:81] Failed to get device properties, error code: 3
I found the above question, the answer is to move keras imports into the subprocess. I have added all imports into the subprocess_routine
. But that doesn't change the error. It would probably be necessary to eliminate keras imports altogether from the main process, but in my setup, that would mean huge refactorings.
Keras + multithreading seems to work. In this issue, scroll down to the very last comment: https://github.com/keras-team/keras/issues/5640 In my code, it looks like this:
model_inside = create_model()
model_inside._make_predict_function()
graph = tf.get_default_graph()
def subprocess_routine(model_inside, graph):
while True:
batch = np.random.rand(10, 10)
with graph.as_default():
prediction = model_inside.predict(batch)
yield batch, prediction
model_outside.fit_generator(subprocess_routine(model_inside, graph),
epochs=10,
steps_per_epoch=100,
use_multiprocessing=True,
workers=1)
But the error message is identical.
Since the problem is apparently related to initialization of the subprocesses, I tried to create a new session in each subprocess:
def subprocess_routine(weights):
import keras.backend as K
import tensorflow as tf
sess = tf.Session()
K.set_session(sess)
model_inside = create_model()
model_inside.set_weights(weights)
while True:
batch = np.random.rand(10, 10)
prediction = model_inside.predict(batch)
yield batch, prediction
It produces a variation on the same error message:
E tensorflow/stream_executor/cuda/cuda_driver.cc:1300] could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED
So again, the initialization seems broken.
How can I run keras both in my main process and subprocesses spawned by multiprocessing ?
Upvotes: 1
Views: 10476
Reputation: 429
This technique is not working for me.
I am loading my saved model and passing it on as an argument. My error message is slightly different than the one posted. It is
E tensorflow/core/grappler/clusters/utils.cc:83] Failed to get device properties, error code: 3
I do not have any trouble running it outside of multiprocessing. Also, if it means anythings, I'm using a docker image tensorflow/tensorflow-gpu-py3 version 1.13.1
Here is my Object Detection code below that takes an image and produces multiple scales of that image (called an image pyramid). It then processes one scale at a time. For each scale, it parses the image into smaller windows and then send each window to a processor. The processor then uses model.evaluate([window],[1])
to test if the current window contains my object. If the probability is high, the window box info is stored in a queue and retrieved later (along with the values from other processes)
Here is my code:
def start_detection_mp3(image,winDim, minSize, winStep=4, pyramidScale=1.5, minProb=0.7):
# Code to use multiple processors (mp)
boxes=[]
probs=[]
print("Loading CNN Keras Model .... ")
checkpoint_path="trainedmodels/cp.ckpt"
mymodel=create_CNN_model(2,winDim[0],winDim[1])
mymodel.load_weights(checkpoint_path)
mymodel._make_predict_function()
(keepscale,keeplayer)=CalculateNumberOfScales(image,pyramidScale,minSize)
printinfo("There are {} scales in this image.".format(len(keepscale)))
for i in range(0,len(keepscale)):
printinfo("Working on layer {0:4d}. Scale {1:.2f}".format(i,keepscale[i]))
(b,p)=detect_single_layer_mp3(keeplayer[i],keepscale[i],winStep,winDim,minProb,mysess,mymodel)
boxes =boxes + b
probs =probs + p
mysess.close()
return(boxes,probs)
def detect_single_layer_mp3(layer,scale,winStep,winDim,minProb,mysess,mymodel):
# Use multiple processors
q=[]
p=[]
d=[]
i=0
boxes=[]
probs=[]
xx, yy, windows= sliding_window_return(layer, winStep, winDim)
# process in chunks of 4 (for four processors)
NumOfProcessors=4;
for aa in range(0,len(xx)-1,4):
for ii in range(0,NumOfProcessors):
##print("aa: {} ii: {}".format(aa,ii))
printinfo("Processes {} of Loop {}".format(ii,aa))
x=xx[aa]
y=yy[aa]
window=windows[aa]
q=Queue() # Only need to create one Queue (FIFO buffer) to hold output from each process
# when all processes are completed, the buffer will be emptied.
p.append(Process(target=f2,args=(x,y,window,scale, minProb,winDim,q,mysess,mymodel)))
pp=p[-1] # get last
printinfo("Starting process {}".format(pp))
pp.start()
pp.join()
while not q.empty():
d=q.get()
boxes = boxes + d[0]
probs = probs + d[1]
p=[] # Clear Processes
p=[]
q=[]
return(boxes,probs)
def f2(x,y,window,scale,minProb,winDim,q,mysess,mymodel):
processID = os.getpid()
boxes=[]
probs=[]
isHOG = 0
isCNN = 0
isCNN_Keras=1
(winH, winW) = window.shape[:2]
if winW == winDim[0] and winH ==winDim[1]: # Check that window dimension is
if isCNN_Keras ==1:
### TODO It appears that it is freezing at the prediction step
printinfo("Process id: {} Starting test against CNN model".format(processID))
window=window.reshape(-1,winH,winW,1)
loss,prob = mymodel.evaluate([window],[1])
print("Loss: {} Accuracy: {}".format(loss,prob))
if prob > minProb:
printinfo("*** [INFO] ProcessID: {0:7d} Probability: {1:.3f} Scale {2:.3f} ***".format(processID,prob,scale))
# compute the (x, y)-coordinates of the bounding box using the current
# scale of the image pyramid
(startX, startY) = (int(scale * x), int(scale * y))
endX = int(startX + (scale * winW))
endY = int(startY + (scale * winH))
# update the list of bounding boxes and probabilities
boxes.append((startX, startY, endX, endY))
probs.append(prob)
# return a tuple of the bounding boxes and probabilities
if q!=1:
q.put([boxes,probs])
q.close()
q=[]
else:
return(boxes,probs)
Upvotes: 0
Reputation: 30256
The good news is that tensorflow sessions are thread-safe: Is it thread-safe when using tf.Session in inference service?
To use a keras model in multiple processes, you have to do the following:
_make_predict_function()
as_default_graph()
Here is some sample code:
# the usual imports
import numpy as np
import tensorflow as tf
from keras.models import *
from keras.layers import *
# set up the model
i = Input(shape=(10,))
b = Dense(1)(i)
model = Model(inputs=i, outputs=b)
# now to use it in multiprocessing, the following is necessary
model._make_predict_function()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
default_graph = tf.get_default_graph()
default_graph.finalize()
# now you share the model and graph between processes
# in each process you can call this:
with default_graph.as_default():
return model.predict(something)
Upvotes: 4