Reputation: 45
I am currently experimenting with generative adversarial networks in Keras. As proposed in this paper, I want to use the historical averaging loss function. Meaning that I want to penalize the change of the network weights. I am not sure how to implement it in a clever way.
I was implementing the custom loss function according to the answer to this post.
def historical_averaging_wrapper(current_weights, prev_weights):
def historical_averaging(y_true, y_pred):
diff = 0
for i in range(len(current_weights)):
diff += abs(np.sum(current_weights[i]) + np.sum(prev_weights[i]))
return K.binary_crossentropy(y_true, y_pred) + diff
return historical_averaging
The weights of the network are penalized, and the weights are changing after each batch of data.
My first idea was to update the loss function after each batch. Roughly like this:
prev_weights = model.get_weights()
for i in range(len(data)/batch_len):
current_weights = model.get_weights()
model.compile(loss=historical_averaging_wrapper(current_weights, prev_weights), optimizer='adam')
model.fit(training_data[i*batch_size:(i+1)*batch_size], training_labels[i*batch_size:(i+1)*batch_size], epochs=1, batch_size=batch_size)
prev_weights = current_weights
Is this reasonable? That approach seems to be a bit "messy" in my opinion. Is there another possibility to do this in a "smarter" way? Like maybe updating the loss function in a data generator and use fit_generator()? Thanks in advance.
Upvotes: 2
Views: 616
Reputation: 2682
Loss functions are operations on the graph using tensors. You can define additional tensors in the loss function to hold previous values. This is an example:
import tensorflow as tf
import tensorflow.keras.backend as K
keras = tf.keras
class HistoricalAvgLoss(object):
def __init__(self, model):
# create tensors (initialized to zero) to hold the previous value of the
# weights
self.prev_weights = []
for w in model.get_weights():
self.prev_weights.append(K.variable(np.zeros(w.shape)))
def loss(self, y_true, y_pred):
err = keras.losses.mean_squared_error(y_true, y_pred)
werr = [K.mean(K.abs(c - p)) for c, p in zip(model.get_weights(), self.prev_weights)]
self.prev_weights = K.in_train_phase(
[K.update(p, c) for c, p in zip(model.get_weights(), self.prev_weights)],
self.prev_weights
)
return K.in_train_phase(err + K.sum(werr), err)
The variable prev_weights
holds the previous values. Note that we added a K.update
operation after the weight errors are calculated.
A sample model for testing:
model = keras.models.Sequential([
keras.layers.Input(shape=(4,)),
keras.layers.Dense(8),
keras.layers.Dense(4),
keras.layers.Dense(1),
])
loss_obj = HistoricalAvgLoss(model)
model.compile('adam', loss_obj.loss)
model.summary()
Some test data and objective function:
import numpy as np
def test_fn(x):
return x[0]*x[1] + 2.0 * x[1]**2 + x[2]/x[3] + 3.0 * x[3]
X = np.random.rand(1000, 4)
y = np.apply_along_axis(test_fn, 1, X)
hist = model.fit(X, y, validation_split=0.25, epochs=10)
The model losses decrease over time, in my test.
Upvotes: 3