Reputation: 29
I am trying to fine-tune an AE-LSTM, using keras tuner, to use the output of the embedding layer in the rest of my project.
To this end, I have defined two classes, one for train and test the fine-tuned model, which I called "LstmAe", and the other to fine-tune it, which I called "ApplyLstmAe."
The problem is when I try execute my fine_tune_the_model() method in the latter class I face the following error:
TypeError: Inputs to a layer should be tensors. Got: <keras_tuner.src.engine.hyperparameters.hyperparameters.HyperParameters object at 0x7fdc7c3438e0>
My failed attempted:
Changing build method as staticmethod, define "hps" inside the build function, changing InputLayer to Input etc., unfortunately I failed. Googling also did not help me to find a proper answer (although one unanswered question on SO and one issue on GitHub are available).
Here is the code to to reproduce:
import numpy as np
import pandas as pd
import tensorflow as tf
import keras_tuner as kt
from sklearn.model_selection import train_test_split
tfk = tf.keras
tfkl = tf.keras.layers
class LstmAe(tfk.Model):
def __init__(self, latent_dim: int = 50,
ngrams : int = 2,
vocabulary: list = None,
classification: bool = True,
max_seq_len: int = 100, *args, **kwargs):
super(LstmAe, self).__init__(*args, **kwargs)
self.max_seq_len = max_seq_len
if classification:
self.train_metric = tfk.metrics.Accuracy(name="acc")
self.val_metric = tfk.metrics.Accuracy(name="acc_val")
self.loss_fn = tfk.losses.SparseCategoricalCrossentropy(name="loss_fn")
pred_activation = "softmax"
else:
self.train_metric = tfk.metrics.LogCoshError()
self.val_metric = tfk.metrics.LogCoshError()
self.loss_fn = tfk.losses.Huber(name="loss_fn")
pred_activation = "tanh"
self.inputs = tfkl.InputLayer(
input_shape=(1,), dtype=tf.string,
)
self.txt_vec = tfkl.TextVectorization(
max_tokens=None,
vocabulary = vocabulary,
split="whitespace", ngrams=ngrams,
output_mode="int", ragged=False,
output_sequence_length=self.max_seq_len,
standardize="lower_and_strip_punctuation",
)
self.emb = tfkl.Embedding(
input_dim=self.txt_vec.vocabulary_size(),
output_dim=latent_dim,
)
self.enc1 = tfkl.Bidirectional(
tfkl.LSTM(
units=150,
activation="relu",
dropout=0.1,
return_sequences=True,
name="encoder1"
)
)
self.dec1 = tfkl.Bidirectional(
tfkl.LSTM(
units=50,
activation="relu",
dropout=0.1,
return_sequences=True,
name="decoder1"
)
)
self.dec2 = tfkl.Bidirectional(
tfkl.LSTM(
units=100,
activation="tanh",
dropout=0.1,
return_sequences=False,
name="decoder2"
)
)
self.outputs = tfkl.Dense(
units=self.max_seq_len, activation=pred_activation,
)
def call(self, inputs, training=None):
x = self.inputs(inputs, training)
x = self.txt_vec(x)
x = self.emb(x)
x = self.enc1(x)
x = self.dec1(x)
x = self.dec2(x)
x = self.outputs(x)
return x
@tf.function
def train_step(self, x, y):
with tf.GradientTape() as tape:
y_pred = self(x, training=True)
# y_true = self.inputs(self.txt_vec(x))
loss_value = self.loss_fn(y, y_pred)
grads = tape.gradient(loss_value, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.train_metric.update_state(y, y_pred)
return loss_value
@tf.function
def test_step(self, x, y):
y_pred = self(x, training=False)
self.val_metric(y, y_pred)
def fit(self, train_data, test_data, n_epochs):
train_total_loss, val_total_loss = [], []
for epoch in range(n_epochs):
print(f"epoch: {epoch+1}")
for step, (x_batch_train, y_batch_train) in enumerate(train_data):
loss_value = self.train_step(x_batch_train, y_batch_train)
if step % 50 == 0:
print(
"Training loss (for one batch) at step %d: %.4f"
% (step, loss_value)
)
train_metric = self.train_metric.result()
train_total_loss.append(train_metric)
print("Training metric over epoch: %.3f" % (float(train_metric),))
self.train_metric.reset_states()
# Run a validation loop at the end of each epoch.
for x_batch_val, y_batch_val in test_data:
self.test_step(x_batch_val, y_batch_val)
val_metric = self.val_metric.result()
val_total_loss.append(val_metric)
self.val_metric.reset_states()
print("Validation metric: %.3f" % (float(val_metric),))
return train_total_loss, val_total_loss
class ApplyLstmAe(LstmAe):
def __init__(self, n_epochs: int= 1,
classification: bool = False, *args, **kwargs):
super(ApplyLstmAe, self).__init__(*args, **kwargs)
self.data_df = None
self.labels = None
self.text_data = None
self.vocabulary = None
self.max_seq_len = None
self.n_epochs = n_epochs
self.classification = classification
if self.classification:
self.pred_activation = "softmax"
self.loss_fn = tfk.losses.SparseCategoricalCrossentropy(name="loss_fn")
self.metric = ["accuracy"]
self.proj_name = "LSTM_AE-Cls"
self.dir_path = "./"
else:
self.pred_activation = "tanh"
self.loss_fn = tfk.losses.Huber(name="loss_fn")
self.metric = ["logcosh"]
self.proj_name = "LSTM_AE-Reg"
self.dir_path = "./"
def get_text_and_labels(
self, data_path:
str="../data/medium_movies_data.csv", ):
self.data_df = pd.read_csv(data_path)
self.labels = self.data_df.Genre.values
self.text_data = self.data_df.Synopsis.values
print(
f"text data head: \n {self.text_data[:3]} \n"
f"text data shape: {self.text_data.shape} \n"
f"labels head: \n {self.labels[:3]} \n"
f"labels shape: {self.labels.shape} \n"
)
def get_vocabulary(
self, vocab_path = "../data/",
max_seq_len: int = 123,
np_name = "medium.npz",
ngrams : int = 2,
) -> tuple:
""" returns, as attributes, the vocabulary (np.arr), its size (int),
the maximum sequence length (int) and applied ngrams (int). """
self.get_text_and_labels()
if not os.path.isfile(os.path.join(vocab_path, np_name)):
txt_vec = tfkl.TextVectorization(
max_tokens=None,
vocabulary = None,
output_sequence_length=None, # max_seq_len
split="whitespace", ngrams=ngrams,
output_mode="int", ragged=False,
standardize="lower_and_strip_punctuation",
)
txt_vec.adapt(
data=self.text_data, batch_size=8, steps=None
)
self.vocabulary = txt_vec.get_vocabulary()
self.vocab_size = txt_vec.vocabulary_size()
self.max_seq_len = max_seq_len
self.ngrams = ngrams
np.savez(os.path.join(
vocab_path, np_name),
max_seq_len = self.max_seq_len,
vocabulary=self.vocabulary,
vocab_size=self.vocab_size,
ngrams = self.ngrams,
)
else:
data_npz = np.load(
os.path.join(vocab_path, np_name)
)
self.vocabulary = data_npz["vocabulary"]
self.max_seq_len = int(data_npz["max_seq_len"])
self.vocab_size = int(data_npz["vocab_size"])
self.ngrams = int(data_npz["ngrams"])
return self.vocabulary, self.vocab_size, self.max_seq_len, self.ngrams
def get_train_test_data(self, batch_size=8, return_tensors=True) -> tuple:
vocab, _, max_seq_len, ngrams = self.get_vocabulary()
x_train, x_test, _, _ = train_test_split(
self.text_data, self.labels, test_size=0.05
)
lstm_ae = LstmAe(
vocabulary=vocab,
classification=False,
max_seq_len=max_seq_len,
ngrams=ngrams
)
lstm_ae.compile()
lstm_ae.inputs(self.text_data)
lstm_ae.txt_vec(self.text_data)
y_train = lstm_ae.predict(x_train)
y_test = lstm_ae.predict(x_test)
print(
f"x_train and y_train shapes: {x_train.shape, y_train.shape}"
f"x_test and y_test shapes: {x_test.shape, y_test.shape}"
)
if return_tensors:
train_data = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_data = train_data.shuffle(buffer_size=1024).batch(batch_size=batch_size)
test_data = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_data = test_data.shuffle(buffer_size=1024).batch(batch_size=batch_size)
return train_data, test_data,
else:
return x_train, y_train, x_test, y_test
def build(self, hp):
hp_units = hp.Int(
'units', min_value=32, max_value=256, step=32
)
hp_latent_dim = hp.Int(
'units', min_value=10, max_value=50, step=5
)
hp_activation = hp.Choice(
'activation', values = ["relu", "tanh", ]
)
hp_learning_rate = hp.Choice(
'learning_rate', values=[1e-3, 1e-4, 1e-5, 1e-6]
)
hp_dropout = hp.Choice(
'dropout', values=[0.0, 0.1, 0.4]
)
model = tfk.Sequential()
model.add(
tfkl.Input(shape=(1,),
dtype=tf.string,)
)
model.add(
tfkl.TextVectorization(
max_tokens=None,
vocabulary = self.vocabulary,
split="whitespace", ngrams=self.ngrams,
output_mode="int", ragged=False,
output_sequence_length=self.max_seq_len,
standardize="lower_and_strip_punctuation",
)
)
model.add(
tfkl.Embedding(
input_dim=self.txt_vec.vocabulary_size(),
output_dim=hp_latent_dim,
)
)
model.add(
tfkl.Bidirectional(
tfkl.LSTM(
units=hp_units,
activation=hp_activation,
dropout=hp_dropout,
return_sequences=True,
name="encoder1"
)
)
)
model.add(
tfkl.Bidirectional(
tfkl.LSTM(
units=hp_units,
activation=hp_activation,
dropout=hp_dropout,
return_sequences=True,
name="decoder1")
)
)
model.add(
tfkl.Bidirectional(
tfkl.LSTM(
units=hp_units,
activation=hp_activation,
dropout=hp_dropout,
return_sequences=True,
name="decoder2")
)
)
model.add(
tfkl.Dense(
units=hp_units,
activation=hp_activation,
)
)
model.add(
tfkl.Dense(
units=self.max_seq_len, activation=self.pred_activation,
)
)
model.compile(
loss=self.loss_fn,
optimizer=tfk.optimizers.SGD(learning_rate=hp_learning_rate),
metrics=self.metric,
)
return model
def fine_tune_the_model(self, return_tensors=False):
hps = kt.HyperParameters()
model = self.build(hp=hps)
tuner = kt.BayesianOptimization(
hypermodel=model,
objective="val_accuracy",
max_trials=10,
executions_per_trial=5,
overwrite=True,
directory=self.dir_path,
project_name=self.proj_name,
)
print(tuner.search_space_summary())
if return_tensors:
train_data, val_data = self.get_train_test_data(
batch_size=8, return_tensors=True
)
tuner.search(
train_data, epochs=10, validation_data=val_data
)
else:
x_train, y_train, x_test, \
y_test = self.get_train_test_data(return_tensors=False)
tuner.search(
x_train, y_train, epochs=10, validation_data=(x_test, y_test)
)
# models = tuner.get_best_models(num_models=1)
best_hps = tuner.get_best_hyperparameters(2)
with open(os.path.join("./best_hps" + self.proj_name), 'r') as fp:
fp.pickle(best_hps)
return best_hps
def train_test_tuned_model(self,):
vectorized_text, labels, max_len = self.get_preprocess_data(
data_path="../data/medium_movies_data.scv",
)
for k in range(5):
print(" to be completed ....")
Here is how I run the code:
fine_tuner = ApplyLstmAe()
vocab, vocab_size, max_seq_len, ngrams = fine_tuner.get_vocabulary()
return_tensors = False
if return_tensors is True:
x_train, y_train, x_test, y_test = fine_tuner.get_train_test_data(return_tensors=False)
else:
train_data, test_data = fine_tuner.get_train_test_data(return_tensors=True)
fine_tuner.fine_tune_the_model(return_tensors=True) # <- HERE produces the
# following type Erros
# TypeError: Inputs to a layer should be tensors.
# Got: <keras_tuner.src.engine.hyperparameters.hyperparameters.HyperParameters
# object at 0x7fdc7c3438e0>
May you please help me to figure it out what is the problem? Am I missing something?
Anyhow, it seems to me that it might be a bug in KT and thus I already opened an issue on their Github.
Thank you in advance.
Upvotes: 0
Views: 46