George
George

Reputation: 5691

make multiple parallel predictions on tensorflow model

I want to make multiple predictions.

I have trained a segmentation model (images and masks) . You can find the model here.

The images have dimensions (32,32,3). The masks (32, 32).

What I am doing when I want to inference is:

Load the images array (tiles) with dim (62500, 32, 32, 3). You can find it here

Create tensorflow dataset from this array.

and then predict on each image, like:

masks = [] 
for k, element in enumerate(the_image_array):
        the_img = np.asarray(np.expand_dims(element, 0))[-1, -1, :, :]
        pred = model.predict(the_img[np.newaxis, :, :, :])[0]
        mask = tf.where(pred > 0.5, 255, 0)
        masks.append(mask)

Now, I want to do these predictions in parallel.

So, I tried:

import tensorflow as tf
import numpy as np
import os
from tensorflow.keras.models import load_model
from itertools import chain
from tensorflow.keras import backend as K
import multiprocessing
from multiprocessing import Pool

os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

multiprocessing.set_start_method('spawn', force=True)


model = load_model('./model.h5',
                   custom_objects={"K": K})
     

def resize_and_rescale(image):
    image = tf.image.resize(image, 
                            (32, 32),
                            preserve_aspect_ratio=True)
    image /= 255.0
    return image
    
def prepare(ds):
    ds = ds.map(resize_and_rescale)
    return ds

def _apply_df(data):
    img = np.asarray(np.expand_dims(data, 0))[-1,-1, :, :]
    print(img.shape)
    pred = model.predict(img[np.newaxis,  :, :, :], verbose=2)[0]
    
    #pred = model.predict(data)[0]
    mask = tf.where(pred[:, :, -1] > 0.5, 255, 0)
    return mask

def apply_by_multiprocessing(data, workers):

    pool = Pool(processes=workers)   
    #result = pool.map(_apply_df, np.array_split(list(data.as_numpy_iterator()), workers))
    result = pool.map(_apply_df, data.batch(np.ceil(len(data) / workers)))
    pool.close()
    return list(result)


def after_prepare(data):
    tens_data = tf.data.Dataset.from_tensor_slices(data)
    tens_data = prepare(tens_data)
    return tens_data


def main():
    
    tiles = np.load('tiles.npy')
    print(len(tiles))
    print(tiles[0].shape)
    prep = after_prepare(tiles)
    print(len(prep))
    
    masks = apply_by_multiprocessing(prep, workers=4)
      
    masks_flatten = list(chain.from_iterable(masks))
    print(len(masks_flatten), masks_flatten[0].shape)  # 
    
    return masks_flatten
   
    
    
if __name__=="__main__":
    masks_flatten = main()
        

The len(masks_flatten) is 128 and the shape of an element is (32,).

I would expect it to be len=62500 and every element (mask) (32, 32).

--- UPDATE ---

So, I want something like this:

def _apply_df(data):
    results = []
    for el in data:
        pred = model.predict(el[np.newaxis, :, :, :], verbose=2)[0]
        mask = tf.where(pred[:, :, -1] > 0.5, 255, 0)
        results.append(mask)
        
    return results

but without using the loop. Doing it in parallel.

Upvotes: 0

Views: 1004

Answers (1)

H4iku
H4iku

Reputation: 672

Your approach is not incorrect, but even inside a single worker, it's better to let the TensorFlow/NumPy vectorization do its job instead of writing an explicit for loop:

def _apply_df(data):
    pred = model.predict(data)
    mask = tf.where(pred.squeeze(axis=-1) > 0.5, 255, 0)
    return mask

This is the complete code:

import tensorflow as tf
import numpy as np
import os
from tensorflow.keras.models import load_model
from itertools import chain
from tensorflow.keras import backend as K
import multiprocessing
from multiprocessing import Pool

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

multiprocessing.set_start_method("spawn", force=True)


model = load_model("./model.h5", custom_objects={"K": K})


def resize_and_rescale(image):
    image = tf.image.resize(image, (32, 32), preserve_aspect_ratio=True)
    image /= 255.0
    return image


def prepare(ds):
    ds = ds.map(resize_and_rescale)
    return ds


def _apply_df(data):
    pred = model.predict(data)
    mask = tf.where(pred.squeeze(axis=-1) > 0.5, 255, 0)
    return mask


def apply_by_multiprocessing(data, workers):

    pool = Pool(processes=workers)
    # result = pool.map(_apply_df, np.array_split(list(data.as_numpy_iterator()), workers))
    result = pool.map(_apply_df, data.batch(np.ceil(len(data) / workers)))
    pool.close()
    return list(result)


def after_prepare(data):
    tens_data = tf.data.Dataset.from_tensor_slices(data)
    tens_data = prepare(tens_data)
    return tens_data


def main():

    tiles = np.load("tiles.npy")
    prep = after_prepare(tiles)
    masks = apply_by_multiprocessing(prep, workers=4)
    masks_flatten = list(chain.from_iterable(masks))

    print(len(masks_flatten), masks_flatten[0].shape)  # 62500 (32, 32)

    return masks_flatten


if __name__ == "__main__":
    masks_flatten = main()

Upvotes: 1

Related Questions