Reputation: 431
I am implementing a (variational) autoencoder (VAE) in Keras following this guide.
The VAE reads data from topomaps
folder and labels from labels
folder. They both contain .npy
files, which are essentially numpy arrays stored on disk.
I split data set into 80% training data and 20% test data. Specifically:
x_train
shape is (245760, 40, 40, 1)
y_train
shape is (245760,)
x_test
shape is (61440, 40, 40, 1)
y_test
shape is (61440,)
Unfortunately even though the (training) loss
and val_loss
are pretty low (2.9246e-04
and -4.8249e-04
, respectively), if I visually check the "reconstruction skills" of my VAE, I can notice they are poor since reconstructed image is not similar at all to the original one:
I ran the demo using this configuration:
latent_dim=25
. This value must not be changedepochs=2
batch_size=512
optimizer=Adam()
learning_rate=0.001
I know epochs
is very small, but it is only a demo. I aim to enlarge it when I figure out why my VAE won't work even though loss is very low.
This is the run output:
Epoch 1/2
2023-08-30 11:35:49.408811: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - ETA: 0s - loss: 60.0042 - reconstruction_loss: 11.0072 - kl_loss: 0.09892023-08-30 11:38:40.538661: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - 190s 492ms/step - loss: 59.8772 - reconstruction_loss: 11.0072 - kl_loss: 0.0989 - val_loss: 5.5495e-04 - val_reconstruction_loss: 5.4875e-04 - val_kl_loss: 6.1989e-06
Epoch 2/2
384/384 [==============================] - 188s 490ms/step - loss: 3.0879e-04 - reconstruction_loss: 3.0472e-04 - kl_loss: 1.5222e-06 - val_loss: 3.4318e-04 - val_reconstruction_loss: 3.4303e-04 - val_kl_loss: 1.4901e-07
2023-08-30 11:42:17.049 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:26.493 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:37.200 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:41.800433: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
1920/1920 [==============================] - 29s 15ms/step
2023-08-30 11:43:16.276 Python[2419:58392] +[CATransaction synchronize] called within transaction
Process finished with exit code 0
Learning curve:
These are three model classes:
class VAE(keras.Model):
def __init__(self, encoder, decoder, **kwargs):
super().__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
def call(self, inputs, training=None, mask=None):
_, _, z = self.encoder(inputs)
outputs = self.decoder(z)
return outputs
@property
def metrics(self):
return [
self.total_loss_tracker,
self.reconstruction_loss_tracker,
self.kl_loss_tracker,
]
def train_step(self, data):
with tf.GradientTape() as tape:
# Forward pass
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
# Compute losses
reconstruction_loss = tf.reduce_mean(
tf.reduce_sum(
keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
)
)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
# Compute gradient
grads = tape.gradient(total_loss, self.trainable_weights)
# Update weights
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
# Update my own metrics
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.kl_loss_tracker.update_state(kl_loss)
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
}
def test_step(self, data):
# Forward pass
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
# Compute losses
reconstruction_loss = tf.reduce_mean(
tf.reduce_sum(
keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
)
)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
# Update my own metrics
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.kl_loss_tracker.update_state(kl_loss)
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
}
class Encoder(keras.Model):
def __init__(self, latent_dimension, input_shape):
super(Encoder, self).__init__()
self.latent_dim = latent_dimension
self.conv_block1 = keras.Sequential([
layers.Input(shape=input_shape),
layers.Conv2D(filters=64, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.conv_block2 = keras.Sequential([
layers.Conv2D(filters=128, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.conv_block3 = keras.Sequential([
layers.Conv2D(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.flatten = layers.Flatten()
self.dense = layers.Dense(units=100, activation="relu")
self.z_mean = layers.Dense(latent_dimension, name="z_mean")
self.z_log_var = layers.Dense(latent_dimension, name="z_log_var")
self.sampling = sample
def call(self, inputs, training=None, mask=None):
x = self.conv_block1(inputs)
x = self.conv_block2(x)
x = self.conv_block3(x)
x = self.flatten(x)
x = self.dense(x)
z_mean = self.z_mean(x)
z_log_var = self.z_log_var(x)
z = self.sampling(z_mean, z_log_var)
return z_mean, z_log_var, z
class Decoder(keras.Model):
def __init__(self, latent_dimension):
super(Decoder, self).__init__()
self.latent_dim = latent_dimension
self.dense1 = keras.Sequential([
layers.Dense(units=100, activation="relu"),
layers.BatchNormalization()
])
self.dense2 = keras.Sequential([
layers.Dense(units=1024, activation="relu"),
layers.BatchNormalization()
])
self.dense3 = keras.Sequential([
layers.Dense(units=4096, activation="relu"),
layers.BatchNormalization()
])
self.reshape = layers.Reshape((4, 4, 256))
self.deconv1 = keras.Sequential([
layers.Conv2DTranspose(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.deconv2 = keras.Sequential([
layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=1, padding="same"),
layers.BatchNormalization()
])
self.deconv3 = keras.Sequential([
layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=2, padding="valid"),
layers.BatchNormalization()
])
self.deconv4 = keras.Sequential([
layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=1, padding="valid"),
layers.BatchNormalization()
])
self.deconv5 = keras.Sequential([
layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=2, padding="valid"),
layers.BatchNormalization()
])
self.deconv6 = layers.Conv2DTranspose(filters=1, kernel_size=2, activation="sigmoid", padding="valid")
def call(self, inputs, training=None, mask=None):
x = self.dense1(inputs)
x = self.dense2(x)
x = self.dense3(x)
x = self.reshape(x)
x = self.deconv1(x)
x = self.deconv2(x)
x = self.deconv3(x)
x = self.deconv4(x)
x = self.deconv5(x)
decoder_outputs = self.deconv6(x)
return decoder_outputs
Note that the implementation of the VAE belongs to the guide I already posted.
This is the main function:
if __name__ == '__main__':
# Load data
x_train, x_test, y_train, y_test = load_data("topomaps", "labels", 0.2)
# Expand dimensions to (None, 40, 40, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
# Print data shapes
print("x_train shape:", x_train.shape)
print("y_train shape:", y_train.shape)
print("x_test shape:", x_test.shape)
print("y_test shape:", y_test.shape)
# Normalize the data
x_train = x_train.astype("float32") / 255.0
x_test = x_test.astype("float32") / 255.0
# Compiling the VAE
latent_dimension = 25 # Do not change
encoder = Encoder(latent_dimension, (40, 40, 1))
decoder = Decoder(latent_dimension)
vae = VAE(encoder, decoder)
vae.compile(Adam(learning_rate=0.001))
# Training
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)
print("x_val shape:", x_val.shape)
print("y_val shape:", y_val.shape)
epochs = 2
batch_size = 512
history = vae.fit(x_train, epochs=epochs, batch_size=batch_size, validation_data=(x_val,))
# Plot learning curves
plot_metric(history, "loss")
# Check reconstruction skills against a random test sample
image_index = 5
plt.title(f"Original image {image_index}")
original_image = x_test[image_index]
plt.imshow(original_image, cmap="gray")
plt.show()
plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
f"batch_size = {batch_size}")
x_test_reconstructed = vae.predict(x_test)
reconstructed_image = x_test_reconstructed[image_index]
plt.imshow(reconstructed_image, cmap="gray")
plt.show()
These are some functions I used:
def load_data(topomaps_folder: str, labels_folder: str, test_size):
x, y = _create_dataset(topomaps_folder, labels_folder)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size)
return x_train, x_test, y_train, y_test
def _create_dataset(topomaps_folder, labels_folder):
topomaps_files = os.listdir(topomaps_folder)
labels_files = os.listdir(labels_folder)
topomaps_files.sort()
labels_files.sort()
x = []
y = []
n_files = len(topomaps_files)
for topomaps_file, labels_file in tqdm(zip(topomaps_files, labels_files), total=n_files, desc="Loading data set"):
topomaps_array = np.load(f"{topomaps_folder}/{topomaps_file}")
labels_array = np.load(f"{labels_folder}/{labels_file}")
if topomaps_array.shape[0] != labels_array.shape[0]:
raise Exception("Shapes must be equal")
for i in range(topomaps_array.shape[0]):
x.append(topomaps_array[i])
y.append(labels_array[i])
x = np.array(x)
y = np.array(y)
return x, y
def sample(z_mean, z_log_var):
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.random.normal(shape=(batch, dim))
stddev = tf.exp(0.5 * z_log_var)
return z_mean + stddev * epsilon
def plot_metric(history, metric):
plt.plot(history.history[metric])
plt.plot(history.history['val_' + metric])
plt.title(metric)
plt.ylabel(metric)
plt.xlabel('epoch')
plt.legend(['train', 'validation'])
plt.show()
EDIT
- Could you try with much lower training set to see if it overfits and reconstructs a known image? Okay, this is what I did in main
function
# Reduce DS size
x_train = x_train[:500]
y_train = y_train[:500]
x_test = x_test[:500]
y_test = y_test[:500]
This is what I get:
and learning curve is:
If I run the same configuration but setting epochs=100
I get:
and loss:
- Not sure if you did it...but did you convert your output back to a non-normalized value when plotting? This is what I have done:
plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
f"batch_size = {batch_size}")
x_test_reconstructed = vae.predict(x_test)
reconstructed_image = x_test_reconstructed[image_index]
reconstructed_image = reconstructed_image * 255
plt.imshow(reconstructed_image, cmap="gray")
plt.show()
But I still get:
- What does the loss graph look like when the batch size is 4 and learning rate is 0.0002 for 100 epochs? Reconstructed image is all black and loss curve is:
Upvotes: 0
Views: 708
Reputation: 431
The problem concerned the data normalization.
x_train = (x_train - np.min(x_train)) / (np.max(x_train) - np.min(x_train))
and
x_test = (x_test - np.min(x_test)) / (np.max(x_test) - np.min(x_test))
should resolve the issue.
Upvotes: 0
Reputation: 1145
I ran on my implementation with no problems. I ran on a 'copy' of the VAE tutorial and got the following results after 1 epoch:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import cv2
import matplotlib.pyplot as plt
class Sampling(layers.Layer):
def call(self, inputs):
z_mean, z_log_var = inputs
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.random.normal(shape=(batch, dim))
return z_mean + tf.exp(0.5 * z_log_var) * epsilon
latent_dim = 25
encoder_inputs = keras.Input(shape=(40, 40, 1))
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Flatten()(x)
x = layers.Dense(16, activation="relu")(x)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()
latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(6400, activation="relu")(latent_inputs)
x = layers.Reshape((10,10,64))(x)
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()
class VAE(keras.Model):
def __init__(self, encoder, decoder, **kwargs):
super().__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = keras.metrics.Mean(
name="reconstruction_loss"
)
self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
@property
def metrics(self):
return [
self.total_loss_tracker,
self.reconstruction_loss_tracker,
self.kl_loss_tracker,
]
def train_step(self, data):
with tf.GradientTape() as tape:
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(
tf.reduce_sum(
keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
)
)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.kl_loss_tracker.update_state(kl_loss)
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
}
def create_data():
x_train = []
data = cv2.imread("IMAGE",0) #used a screen clipping since I do not have your data source
data = cv2.resize(data, (40,40), interpolation=cv2.INTER_AREA) # resize it to the same 40x40 limit
gaussian = np.random.normal(0, 0.2, (data.shape[0],data.shape[1])) #add noise for realism
x_train = np.array([data]*1000) #duplicated with random noise 1000x
x_train[:] = x_train[:]+gaussian
x_test = x_train
return x_train, x_test, data
x_train,x_test , data = create_data()
x = x_train
x = np.expand_dims(x,-1).astype("float32") / 255
vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(x, epochs=1, batch_size=8)
data_input = np.expand_dims(data,-1).astype("float32")/255
print(data_input.shape)
latent = vae.encoder.predict(x)
print(latent)
output = vae.decoder.predict(latent[0])
digit = output[0]
plt.figure("Source")
plt.imshow(data,cmap="gray")
plt.figure("output")
plt.imshow(digit,cmap='gray')
plt.show()
I didn't have your data so I could not check if your data loader is doing any mistakes. You probably want to check your call functions on your autoencoder and decoder but following the tutorial code I did not find your issue.
I would suggest a comprehensive debug with your data:
Unless I have some samples of your input data there really is no way to properly debug your code. I highly suspect the data inputs because the model construction code looks fine.
Upvotes: 0