lhk
lhk

Reputation: 30256

Use keras in multiprocessing

This is basically a duplicate of: Keras + Tensorflow and Multiprocessing in Python But my setup is a bit different, and their solution doesn't work for me.

I need to train a keras model against predictions made from another model. The predictions are connected to some CPU heavy code, so I would like to parallelize them and have the code run in worker processes. Here is the code I would like to execute:

import numpy as np

from keras.layers import Input, Dense
from keras.models import Model
from keras.optimizers import Adam

def create_model():
    input_layer = Input((10,))
    dense = Dense(10)(input_layer)

    return Model(inputs=input_layer, outputs=dense)

model_outside = create_model()
model_outside.compile(Adam(1e-3), "mse")

def subprocess_routine(weights):
    model_inside = create_model()
    model_inside.set_weights(weights)

    while True:
        # lots of CPU
        batch = np.random.rand(10, 10)
        prediction = model_inside.predict(batch)

        yield batch, prediction

weights = model_outside.get_weights()

model_outside.fit_generator(subprocess_routine(weights),
                            epochs=10,
                            steps_per_epoch=100,
                            use_multiprocessing=True,
                            workers=1)

This produces an error

E tensorflow/core/grappler/clusters/utils.cc:81] Failed to get device properties, error code: 3

I found the above question, the answer is to move keras imports into the subprocess. I have added all imports into the subprocess_routine. But that doesn't change the error. It would probably be necessary to eliminate keras imports altogether from the main process, but in my setup, that would mean huge refactorings.

Keras + multithreading seems to work. In this issue, scroll down to the very last comment: https://github.com/keras-team/keras/issues/5640 In my code, it looks like this:

model_inside = create_model()
model_inside._make_predict_function()

graph = tf.get_default_graph()

def subprocess_routine(model_inside, graph):

    while True:
        batch = np.random.rand(10, 10)

        with graph.as_default():
            prediction = model_inside.predict(batch)

        yield batch, prediction

model_outside.fit_generator(subprocess_routine(model_inside, graph),
                            epochs=10,
                            steps_per_epoch=100,
                            use_multiprocessing=True,
                            workers=1)

But the error message is identical.

Since the problem is apparently related to initialization of the subprocesses, I tried to create a new session in each subprocess:

def subprocess_routine(weights):

    import keras.backend as K
    import tensorflow as tf
    sess = tf.Session()
    K.set_session(sess)

    model_inside = create_model()
    model_inside.set_weights(weights)

    while True:
        batch = np.random.rand(10, 10)
        prediction = model_inside.predict(batch)

        yield batch, prediction

It produces a variation on the same error message:

E tensorflow/stream_executor/cuda/cuda_driver.cc:1300] could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED

So again, the initialization seems broken.

How can I run keras both in my main process and subprocesses spawned by multiprocessing ?

Upvotes: 1

Views: 10476

Answers (2)

Adrian Hood Sr
Adrian Hood Sr

Reputation: 429

This technique is not working for me.

I am loading my saved model and passing it on as an argument. My error message is slightly different than the one posted. It is

E tensorflow/core/grappler/clusters/utils.cc:83] Failed to get device properties, error code: 3

I do not have any trouble running it outside of multiprocessing. Also, if it means anythings, I'm using a docker image tensorflow/tensorflow-gpu-py3 version 1.13.1

Here is my Object Detection code below that takes an image and produces multiple scales of that image (called an image pyramid). It then processes one scale at a time. For each scale, it parses the image into smaller windows and then send each window to a processor. The processor then uses model.evaluate([window],[1]) to test if the current window contains my object. If the probability is high, the window box info is stored in a queue and retrieved later (along with the values from other processes)

Here is my code:

def start_detection_mp3(image,winDim, minSize,  winStep=4, pyramidScale=1.5, minProb=0.7):
    # Code to use multiple processors (mp)
    boxes=[]
    probs=[]
    print("Loading CNN Keras Model .... ")
    checkpoint_path="trainedmodels/cp.ckpt"
    mymodel=create_CNN_model(2,winDim[0],winDim[1])
    mymodel.load_weights(checkpoint_path)
    mymodel._make_predict_function()
    (keepscale,keeplayer)=CalculateNumberOfScales(image,pyramidScale,minSize)
    printinfo("There are {} scales in this image.".format(len(keepscale)))
    for i in range(0,len(keepscale)):
        printinfo("Working on layer {0:4d}. Scale {1:.2f}".format(i,keepscale[i]))
        (b,p)=detect_single_layer_mp3(keeplayer[i],keepscale[i],winStep,winDim,minProb,mysess,mymodel)

        boxes =boxes + b
        probs =probs + p
    mysess.close()
    return(boxes,probs)

def detect_single_layer_mp3(layer,scale,winStep,winDim,minProb,mysess,mymodel): 
    # Use multiple processors
    q=[]
    p=[]
    d=[]
    i=0
    boxes=[]
    probs=[]
    xx, yy, windows= sliding_window_return(layer, winStep, winDim)
    # process in chunks of 4 (for four processors)
    NumOfProcessors=4;
    for aa in range(0,len(xx)-1,4):
        for ii in range(0,NumOfProcessors):
            ##print("aa: {}  ii: {}".format(aa,ii))
            printinfo("Processes {} of Loop {}".format(ii,aa))
            x=xx[aa]
            y=yy[aa]
            window=windows[aa]
            q=Queue() # Only need to create one Queue (FIFO buffer) to hold output from each process
            # when all processes are completed, the buffer will be emptied.
            p.append(Process(target=f2,args=(x,y,window,scale, minProb,winDim,q,mysess,mymodel)))
            pp=p[-1] # get last
            printinfo("Starting process {}".format(pp))
            pp.start()
            pp.join()

        while not q.empty():
            d=q.get()
            boxes = boxes + d[0]
            probs = probs + d[1]

        p=[]  # Clear Processes    
        p=[]
        q=[]   

    return(boxes,probs)


def f2(x,y,window,scale,minProb,winDim,q,mysess,mymodel):
    processID = os.getpid()
    boxes=[]
    probs=[]
    isHOG = 0
    isCNN = 0
    isCNN_Keras=1
    (winH, winW) = window.shape[:2]
    if winW == winDim[0] and winH ==winDim[1]: # Check that window dimension is 
        if isCNN_Keras ==1:
            ### TODO  It appears that it is freezing at the prediction step                     
            printinfo("Process id: {} Starting test against CNN model".format(processID))
            window=window.reshape(-1,winH,winW,1)
            loss,prob = mymodel.evaluate([window],[1])
            print("Loss: {}  Accuracy: {}".format(loss,prob))

            if prob > minProb:
                printinfo("*** [INFO] ProcessID: {0:7d} Probability: {1:.3f}  Scale {2:.3f} ***".format(processID,prob,scale))
                # compute the (x, y)-coordinates of the bounding box using the current
                # scale of the image pyramid
                (startX, startY) = (int(scale * x), int(scale * y))
                endX = int(startX + (scale * winW))
                endY = int(startY + (scale * winH))

                # update the list of bounding boxes and probabilities
                boxes.append((startX, startY, endX, endY))
                probs.append(prob)      
    # return a tuple of the bounding boxes and probabilities            
    if q!=1:        
        q.put([boxes,probs])
        q.close()
        q=[]
    else:
        return(boxes,probs)

Upvotes: 0

lhk
lhk

Reputation: 30256

The good news is that tensorflow sessions are thread-safe: Is it thread-safe when using tf.Session in inference service?

To use a keras model in multiple processes, you have to do the following:

  • set up the model
  • call _make_predict_function()
  • set up a session and use it to get the tensorflow graph
  • finalize this graph
  • everytime you predict something, supply this graph as_default_graph()

Here is some sample code:

# the usual imports
import numpy as np
import tensorflow as tf

from keras.models import *
from keras.layers import *

# set up the model
i = Input(shape=(10,))
b = Dense(1)(i)
model = Model(inputs=i, outputs=b)

# now to use it in multiprocessing, the following is necessary
model._make_predict_function()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
default_graph = tf.get_default_graph()
default_graph.finalize()

# now you share the model and graph between processes
# in each process you can call this:
with default_graph.as_default():
    return model.predict(something)

Upvotes: 4

Related Questions