Reputation: 5691
I want to make multiple predictions.
I have trained a segmentation model (images and masks) . You can find the model here.
The images have dimensions (32,32,3)
. The masks (32, 32)
.
What I am doing when I want to inference is:
Load the images array (tiles)
with dim (62500, 32, 32, 3)
. You can find it here
Create tensorflow dataset from this array.
and then predict on each image, like:
masks = []
for k, element in enumerate(the_image_array):
the_img = np.asarray(np.expand_dims(element, 0))[-1, -1, :, :]
pred = model.predict(the_img[np.newaxis, :, :, :])[0]
mask = tf.where(pred > 0.5, 255, 0)
masks.append(mask)
Now, I want to do these predictions in parallel.
So, I tried:
import tensorflow as tf
import numpy as np
import os
from tensorflow.keras.models import load_model
from itertools import chain
from tensorflow.keras import backend as K
import multiprocessing
from multiprocessing import Pool
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
multiprocessing.set_start_method('spawn', force=True)
model = load_model('./model.h5',
custom_objects={"K": K})
def resize_and_rescale(image):
image = tf.image.resize(image,
(32, 32),
preserve_aspect_ratio=True)
image /= 255.0
return image
def prepare(ds):
ds = ds.map(resize_and_rescale)
return ds
def _apply_df(data):
img = np.asarray(np.expand_dims(data, 0))[-1,-1, :, :]
print(img.shape)
pred = model.predict(img[np.newaxis, :, :, :], verbose=2)[0]
#pred = model.predict(data)[0]
mask = tf.where(pred[:, :, -1] > 0.5, 255, 0)
return mask
def apply_by_multiprocessing(data, workers):
pool = Pool(processes=workers)
#result = pool.map(_apply_df, np.array_split(list(data.as_numpy_iterator()), workers))
result = pool.map(_apply_df, data.batch(np.ceil(len(data) / workers)))
pool.close()
return list(result)
def after_prepare(data):
tens_data = tf.data.Dataset.from_tensor_slices(data)
tens_data = prepare(tens_data)
return tens_data
def main():
tiles = np.load('tiles.npy')
print(len(tiles))
print(tiles[0].shape)
prep = after_prepare(tiles)
print(len(prep))
masks = apply_by_multiprocessing(prep, workers=4)
masks_flatten = list(chain.from_iterable(masks))
print(len(masks_flatten), masks_flatten[0].shape) #
return masks_flatten
if __name__=="__main__":
masks_flatten = main()
The len(masks_flatten)
is 128 and the shape of an element is (32,)
.
I would expect it to be len=62500
and every element (mask) (32, 32)
.
--- UPDATE ---
So, I want something like this:
def _apply_df(data):
results = []
for el in data:
pred = model.predict(el[np.newaxis, :, :, :], verbose=2)[0]
mask = tf.where(pred[:, :, -1] > 0.5, 255, 0)
results.append(mask)
return results
but without using the loop. Doing it in parallel.
Upvotes: 0
Views: 1004
Reputation: 672
Your approach is not incorrect, but even inside a single worker, it's better to let the TensorFlow/NumPy vectorization do its job instead of writing an explicit for loop:
def _apply_df(data):
pred = model.predict(data)
mask = tf.where(pred.squeeze(axis=-1) > 0.5, 255, 0)
return mask
This is the complete code:
import tensorflow as tf
import numpy as np
import os
from tensorflow.keras.models import load_model
from itertools import chain
from tensorflow.keras import backend as K
import multiprocessing
from multiprocessing import Pool
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
multiprocessing.set_start_method("spawn", force=True)
model = load_model("./model.h5", custom_objects={"K": K})
def resize_and_rescale(image):
image = tf.image.resize(image, (32, 32), preserve_aspect_ratio=True)
image /= 255.0
return image
def prepare(ds):
ds = ds.map(resize_and_rescale)
return ds
def _apply_df(data):
pred = model.predict(data)
mask = tf.where(pred.squeeze(axis=-1) > 0.5, 255, 0)
return mask
def apply_by_multiprocessing(data, workers):
pool = Pool(processes=workers)
# result = pool.map(_apply_df, np.array_split(list(data.as_numpy_iterator()), workers))
result = pool.map(_apply_df, data.batch(np.ceil(len(data) / workers)))
pool.close()
return list(result)
def after_prepare(data):
tens_data = tf.data.Dataset.from_tensor_slices(data)
tens_data = prepare(tens_data)
return tens_data
def main():
tiles = np.load("tiles.npy")
prep = after_prepare(tiles)
masks = apply_by_multiprocessing(prep, workers=4)
masks_flatten = list(chain.from_iterable(masks))
print(len(masks_flatten), masks_flatten[0].shape) # 62500 (32, 32)
return masks_flatten
if __name__ == "__main__":
masks_flatten = main()
Upvotes: 1