Reputation: 11
I have used functional API to build a ResNet neural network algorithm. However, this constructed model can not save the neural networks' weights in an appropriate result. The validated result was very good. But the inference result is very bad.
When I use tf.config.run_functions_eagerly(True)
in the model training stage, the inference result is very good. Otherwise, the inference result was very bad. To tackle this problem, I have searched some sample code of Keras.applications. Bud, not working in model weight save.
The implemented ResNet code is shown in follow:
import tensorflow.compat.v2 as tf
from keras.regularizers import l2
from keras import layers
from keras.engine import sequential
from keras.engine import training as training_lib
import keras as K
identitys=None
def Bottleneck(inputs, out_channel, name, downsample, strides=1):
expansion = 4
key = out_channel * expansion
identity = inputs
global identitys
if downsample:
identitys = layers.Conv2D(key, kernel_size=1, strides=strides,
use_bias=False, kernel_initializer='he_normal',
padding="SAME", kernel_regularizer=l2(1.e-5), name=name + "ds_conv")(identity)
identitys = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name=name + "ds_normal")(identitys)
else:
identitys = inputs
xb = layers.Conv2D(out_channel, kernel_size=1, use_bias=False, kernel_initializer='he_normal',
kernel_regularizer=l2(1.e-4), name=name + "Conv2D_1")(inputs)
xb = layers.BatchNormalization(momentum=0.9,
epsilon=1e-5, name=name + "BN_1")(xb)
xb = layers.Activation(tf.keras.activations.swish, name=name + "ACT_1")(xb)
xb = layers.Conv2D(out_channel, kernel_size=3, use_bias=False, strides=strides, padding="SAME",
kernel_initializer='he_normal', kernel_regularizer=l2(1.e-4), name=name + "Conv2D_2")(xb)
xb = layers.BatchNormalization(momentum=0.9,
epsilon=1e-5, name=name + "BN_3")(xb)
xb = layers.ReLU(name=name + "ReLU")(xb)
xb = layers.Conv2D(key, kernel_size=1, use_bias=False,
kernel_initializer='he_normal',
kernel_regularizer=l2(1.e-4),
name=name + "Conv2D_3")(xb)
xb = layers.BatchNormalization(momentum=0.9,
epsilon=1e-5,
name=name + "BN_4")(xb)
xb = layers.Add(name=name + "addition")([identitys, xb])
xb = layers.BatchNormalization(momentum=0.9,
epsilon=1e-5,
name=name + "Last_BN")(xb)
xb = layers.ReLU(name=name + "LastReLU")(xb)
return xb
def _make_layer(inputs, make_block, channel, block_num, layer_name, strides, down_sample):
i = 0
name = layer_name + f"block_{i + 1}_"
xm = make_block(inputs=inputs, out_channel=channel, name=name, strides=strides, downsample=down_sample)
for i in range(1, block_num):
i += 1
name = layer_name + f"block_{i}_"
xm = make_block(inputs=xm, out_channel=channel, name=name, strides=1, downsample=False)
return xm
class ResnetBuilder(object):
@staticmethod
def build(block, blocks_num, im_width=224, im_height=224, num_classes=1000):
img_input = layers.Input(shape=(im_width, im_height, 3),
dtype="float32",
name="layers_inputs")
x = layers.Conv2D(filters=64, kernel_size=7, strides=2,
padding="SAME", use_bias=False,
name="layers_conv1")(img_input) # 把这一行替换成ContourOperator
x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name="x_FBN")(x)
x = layers.ReLU(name="FReLU")(x)
x = layers.DepthwiseConv2D(kernel_size=3, padding="SAME", use_bias=False,
depthwise_initializer=tf.keras.initializers.TruncatedNormal(mean=0.0,
stddev=0.05, seed=None)
, kernel_regularizer=l2(1.e-4), name="FDW")(x)
x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name="FBN")(x)
x = layers.Activation(tf.keras.activations.swish, name="FirstACT")(x)
x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="f_MP")(x)
x = _make_layer(x, block, 64, block_num=blocks_num[0], layer_name="ml1", strides=1, down_sample=True)
x = _make_layer(x, block, 128, block_num=blocks_num[1], layer_name="ml2", strides=2, down_sample=True)
x = _make_layer(x, block, 256, block_num=blocks_num[2], layer_name="ml3", strides=2, down_sample=True)
x = _make_layer(x, block, 512, block_num=blocks_num[3], layer_name="ml4", strides=2, down_sample=True)
x = layers.GlobalAvgPool2D(name="GAP2D")(x) # pool + flatten
x = layers.Dense(num_classes, name="logits")(x)
predict = layers.Softmax(name="SoftMax")(x)
model = training_lib.Model(inputs=img_input,
outputs=predict, name="model")
return model
@staticmethod
def resnet101(im_width=448, im_height=448,
include_top=True, num_classes=5):
return ResnetBuilder.build(Bottleneck, [3, 4, 23, 3],
im_width, im_height, num_classes)
@staticmethod
def resnet50(im_width=448, im_height=448,
include_top=True,
num_classes=5, **kwargs):
return ResnetBuilder.build(Bottleneck, [3, 4, 6, 3],
im_width, im_height, num_classes)
The weight save code can be found in follow:
with strategy.scope(): # distribution training.
model = get_model()
checkpoint_path = save_train_data + "/" + "model.{epoch:02d}-" + ".h5"
save_weight = MultiGPUCheckpointCallback(filepath=checkpoint_path,
base_model=model,
save_weights_only=True)
history = model.fit(x=train_dataset, validation_data=valid_dataset,
steps_per_epoch=int(np.ceil(training_step_nums. / BATCH_SIZE)),
validation_steps=int(np.ceil(validation_nums / BATCH_SIZE)),
epochs=EPOCHES, verbose="auto", callbacks=[save_weight])
The inference code can be found in follow:
from keras import Model
from keras.utils import image_utils
import tensorflow as tf
import numpy as np
import os
from test_code import ResnetBuilder
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
tf.config.experimental.set_visible_devices(devices=gpus[1], device_type='GPU')
def preprocess_image(img_path, target_size=(448, 448)):
"""Preprocess the image by reshape and normalization.
Args:
img_path: A string.
target_size: A tuple, reshape to this size.
Return:
An image ndarray.
"""
img = image_utils.load_img(img_path, target_size=target_size)
img = image_utils.img_to_array(img)
img /= 255.0
return img
def load_trained_model():
model = ResnetBuilder.resnet50(448, 448, 5)
model_name = r"./model.30-.h5"
model.load_weights(model_name, by_name=True)
print('model load success.')
return model
def get_category_name(full_image_path, model):
img = preprocess_image(full_image_path)
img_tensor = np.expand_dims(img, axis=0)
heatmap_model = Model([model.inputs], [model.output])
predictions = heatmap_model(img_tensor)
category_id = np.argmax(predictions[0])
label_name = ['A1', 'A2', 'A3', "A4", "A5"]
category_name = label_name[category_id]
return category_name
model = load_trained_model()
model.summary()
image_folder = r"[Image_Path]"
save_path = r"[Save_Path]"
name_list = os.listdir(image_folder)
for file_name in name_list:
full_image_name = image_folder + "/" + file_name
category_name = get_category_name(full_image_name, model)
save_name = category_name + "_" + file_name # just print result, not save image with a new name.
print(save_name)
Saving weight callback function is shown in follow:
import warnings
import numpy as np
from keras.callbacks import Callback
class MultiGPUCheckpointCallback(Callback):
def __init__(self, filepath, base_model, monitor='val_loss', verbose=0,
save_best_only=False, save_weights_only=False,
mode='auto', period=1):
super(MultiGPUCheckpointCallback, self).__init__()
self.base_model = base_model
self.monitor = monitor
self.verbose = verbose
self.filepath = filepath
self.save_best_only = save_best_only
self.save_weights_only = save_weights_only
self.period = period
self.epochs_since_last_save = 0
if mode not in ['auto', 'min', 'max']:
warnings.warn('ModelCheckpoint mode %s is unknown, '
'fallback to auto mode.' % (mode),
RuntimeWarning)
mode = 'auto'
if mode == 'min':
self.monitor_op = np.less
self.best = np.Inf
elif mode == 'max':
self.monitor_op = np.greater
self.best = -np.Inf
else:
if 'acc' in self.monitor or self.monitor.startswith('fmeasure'):
self.monitor_op = np.greater
self.best = -np.Inf
else:
self.monitor_op = np.less
self.best = np.Inf
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
self.epochs_since_last_save += 1
if self.epochs_since_last_save >= self.period:
self.epochs_since_last_save = 0
filepath = self.filepath.format(epoch=epoch + 1, **logs)
if self.save_best_only:
current = logs.get(self.monitor)
if current is None:
warnings.warn('Can save best model only with %s available, '
'skipping.' % (self.monitor), RuntimeWarning)
else:
if self.monitor_op(current, self.best):
if self.verbose > 0:
print('Epoch %05d: %s improved from %0.5f to %0.5f,'
' saving model to %s'
% (epoch + 1, self.monitor, self.best,
current, filepath))
self.best = current
if self.save_weights_only:
self.base_model.save_weights(filepath, overwrite=True)
else:
self.base_model.save(filepath, overwrite=True)
else:
if self.verbose > 0:
print('Epoch %05d: %s did not improve' %
(epoch + 1, self.monitor))
else:
if self.verbose > 0:
print('Epoch %05d: saving model to %s' % (epoch + 1, filepath))
if self.save_weights_only:
self.base_model.save_weights(filepath, overwrite=True)
else:
self.base_model.save(filepath, overwrite=True)
How to save all of the weight in a right approaches, and inference the model in a right result.
Upvotes: 1
Views: 19