Reputation: 36704
I made a minimally reproducible example with the Iris dataset. I made an entire neural network that predicts the last column of the Iris features. I also want to output the target (category). So, the network must minimize two different loss functions (continuous, and categorical). All is set for the continuous target in the next example. But, how do I turn it into a multi-output problem?
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras import Model
from sklearn.datasets import load_iris
tf.keras.backend.set_floatx('float64')
iris, target = load_iris(return_X_y=True)
X = iris[:, :3]
y = iris[:, 3]
z = target
ds = tf.data.Dataset.from_tensor_slices((X, y, z)).batch(8)
class MyModel(Model):
def __init__(self):
super(MyModel, self).__init__()
self.d0 = Dense(16, activation='relu')
self.d1 = Dense(32, activation='relu')
self.d2 = Dense(1)
def call(self, x):
x = self.d0(x)
x = self.d1(x)
x = self.d2(x)
return x
model = MyModel()
loss_object = tf.keras.losses.MeanAbsoluteError()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
loss = tf.keras.metrics.Mean(name='categorical loss')
error = tf.keras.metrics.MeanAbsoluteError()
@tf.function
def train_step(inputs, target):
with tf.GradientTape() as tape:
output = model(inputs)
run_loss = loss_object(target, output)
gradients = tape.gradient(run_loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
loss(run_loss)
error(target, output)
for epoch in range(50):
for xx, yy, zz in ds: # what to do with zz, the categorical target?
train_step(xx, yy)
template = 'Epoch {:>2}, MAE: {:>5.2f}'
print(template.format(epoch+1,
loss.result()))
loss.reset_states()
error.reset_states()
Upvotes: 2
Views: 5814
Reputation: 36704
You can pass a list of losses to tape.gradient
, like so:
with tf.GradientTape() as tape:
pred_reg, pred_cat = model(inputs)
reg_loss = loss_obj_reg(y_reg, pred_reg)
cat_loss = loss_obj_cat(y_cat, pred_cat)
gradients = tape.gradient([reg_loss, cat_loss], model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
Full example:
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras import Model
from sklearn.datasets import load_iris
iris, target = load_iris(return_X_y=True)
X = tf.cast(iris[:, :3], tf.float32)
y = tf.cast(iris[:, 3], tf.float32)
z = target
ds = tf.data.Dataset.from_tensor_slices((X, y, z)).shuffle(150).batch(8)
class MyModel(Model):
def __init__(self):
super(MyModel, self).__init__()
self.d0 = Dense(16, activation='relu')
self.d1 = Dense(32, activation='relu')
self.d2 = Dense(1)
self.d3 = Dense(3, activation='softmax')
def call(self, x, training=None, **kwargs):
x = self.d0(x)
x = self.d1(x)
a = self.d2(x)
b = self.d3(x)
return a, b
model = MyModel()
loss_obj_reg = tf.keras.losses.MeanAbsoluteError()
loss_obj_cat = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
loss_reg = tf.keras.metrics.Mean(name='regression loss')
loss_cat = tf.keras.metrics.Mean(name='categorical loss')
error_reg = tf.keras.metrics.MeanAbsoluteError()
error_cat = tf.keras.metrics.SparseCategoricalAccuracy()
@tf.function
def train_step(inputs, y_reg, y_cat):
with tf.GradientTape() as tape:
pred_reg, pred_cat = model(inputs)
reg_loss = loss_obj_reg(y_reg, pred_reg)
cat_loss = loss_obj_cat(y_cat, pred_cat)
gradients = tape.gradient([reg_loss, cat_loss], model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
loss_reg(reg_loss)
loss_cat(cat_loss)
error_reg(y_reg, pred_reg)
error_cat(y_cat, pred_cat)
template = 'Epoch {:>3}, SCCE: {:>5.2f},' \
' MAE: {:>4.2f}, SAcc: {:>5.1%}'
for epoch in range(150):
for xx, yy, zz in ds:
train_step(xx, yy, zz)
if (epoch + 1) % 10 == 0:
print(template.format(epoch+1,
loss_cat.result(),
error_reg.result(),
error_cat.result()))
loss_reg.reset_states()
loss_cat.reset_states()
error_reg.reset_states()
error_cat.reset_states()
Epoch 10, SCCE: 1.41, MAE: 0.36, SAcc: 33.3%
Epoch 20, SCCE: 1.14, MAE: 0.31, SAcc: 44.0%
Epoch 30, SCCE: 1.05, MAE: 0.26, SAcc: 41.3%
Epoch 40, SCCE: 0.99, MAE: 0.21, SAcc: 40.0%
Epoch 50, SCCE: 0.94, MAE: 0.19, SAcc: 40.0%
Epoch 60, SCCE: 0.88, MAE: 0.18, SAcc: 40.0%
Epoch 70, SCCE: 0.83, MAE: 0.17, SAcc: 44.7%
Epoch 80, SCCE: 0.77, MAE: 0.17, SAcc: 75.3%
Epoch 90, SCCE: 0.70, MAE: 0.17, SAcc: 76.7%
Epoch 100, SCCE: 0.64, MAE: 0.17, SAcc: 82.7%
Epoch 110, SCCE: 0.58, MAE: 0.16, SAcc: 82.7%
Epoch 120, SCCE: 0.54, MAE: 0.16, SAcc: 88.0%
Epoch 130, SCCE: 0.50, MAE: 0.16, SAcc: 88.7%
Epoch 140, SCCE: 0.47, MAE: 0.16, SAcc: 90.7%
Epoch 150, SCCE: 0.45, MAE: 0.16, SAcc: 90.0%
With this ouput you can see both losses are being minimized.
Upvotes: 6
Reputation: 720
To solve the multi-task learning problem, the following modules are imported.
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras import Model
from sklearn.datasets import load_iris
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K
tf.keras.backend.set_floatx('float64')
import numpy as np
Then, we define a multi-output network as shown below:
x
| Dense(16)
x
| Dense(32)
x
Dense(1) / \ Dense(4, softmax)
/ \
(continuous) y_cont y_cat (categorical)
The code is shown below:
class MyModel(Model):
def __init__(self):
super(MyModel, self).__init__()
self.d0 = Dense(16, activation='relu')
self.d1 = Dense(32, activation='relu')
self.cont = Dense(1) # Continuous output
self.cat = Dense(4, activation='softmax') # Categorical output
def call(self, x):
x = self.d0(x)
x = self.d1(x)
print(x.shape)
y_cont = self.cont(x)
y_cat = self.cat(x)
return y_cont, y_cat
model = MyModel()
Next, we define the loss function and an optimizer. We use joint training. The loss function is the sum of mean absolute error for the continuous variable and cross entropy for the category variable.
cont_loss_func = tf.keras.losses.MeanAbsoluteError()
cat_loss_func = tf.keras.losses.SparseCategoricalCrossentropy()
def cont_cat_loss_func(real_cont, pred_cont, real_cat, pred_cat):
return cat_loss_func(real_cat, pred_cat) + cont_loss_func(real_cont, pred_cont)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
The train step is defined as follows:
@tf.function
def train_step(inputs, target_cont, target_cat):
with tf.GradientTape() as tape:
#Forward pass
output_cont, output_cat = model(inputs)
#Compute the losses
total_loss = cont_cat_loss_func(target_cont, output_cont, target_cat, output_cat)
#Backpropagation
gradients = tape.gradient(total_loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return output_cont, output_cat
We train the network for 50 epochs and the performance of the model for each epoch will be shown during training.
#Model performance
acc_res = tf.keras.metrics.Accuracy()
mae_res = tf.keras.metrics.MeanAbsoluteError()
for epoch in range(50):
for xx, yy, zz in ds:
out_cont, out_cat = train_step(xx, yy, zz)
res1 = acc_res.update_state(zz, np.argmax(out_cat, axis=1))
res2 = mae_res.update_state(yy, out_cont)
template = 'Epoch {:>2}, Accuracy: {:>5.2f}, MAE: {:>5.2f}'
print(template.format(epoch+1, acc_res.result(), mae_res.result()))
acc_res.reset_states()
mae_res.reset_states()
Instead of using joint training (i.e. summing up the loss of the continuous variable and categorical variable), @thushv89 uses a different method to calculate the loss of the network. But I don't quite understand how it works.
loss_objects = [tf.keras.losses.MeanAbsoluteError(), tf.keras.losses.SparseCategoricalCrossentropy()]
losses = [l(t, o) for l,o,t in zip(loss_objects, outputs, targets)]
Upvotes: 2
Reputation: 11333
You can do the following. I hope you just need a multi-output network. Here I'm creating a model that looks like follows. But even if you need two separate models, you should be able to easily port this.
x
| Dense(16)
x
| Dense(32)
x
Dense(1) / \ Dense(4, softmax)
/ \
(cont) y_1 y_2 (categorical)
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras import Model
from sklearn.datasets import load_iris
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K
tf.keras.backend.set_floatx('float64')
import numpy as np
iris, target = load_iris(return_X_y=True)
K.clear_session()
X = iris[:, :3]
y = iris[:, 3]
z = target
ds = tf.data.Dataset.from_tensor_slices((X, y, z)).shuffle(buffer_size=150).batch(32)
class MyModel(Model):
def __init__(self):
super(MyModel, self).__init__()
self.d0 = Dense(16, activation='relu')
self.d1 = Dense(32, activation='relu')
self.d2_1 = Dense(1)
self.d2_2 = Dense(4, activation='softmax')
def call(self, x):
x = self.d0(x)
x = self.d1(x)
y_1 = self.d2_1(x)
y_2 = self.d2_2(x)
return y_1, y_2
model = MyModel()
loss_objects = [tf.keras.losses.MeanAbsoluteError(), tf.keras.losses.SparseCategoricalCrossentropy()]
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
acc = tf.keras.metrics.Accuracy(name='categorical loss')
loss = tf.keras.metrics.MeanAbsoluteError()
#error = tf.keras.metrics.MeanAbsoluteError()
@tf.function
def train_step(inputs, targets):
with tf.GradientTape() as tape:
outputs = model(inputs)
losses = [l(t, o) for l,o,t in zip(loss_objects, outputs, targets)]
gradients = tape.gradient(losses, model.trainable_variables)
#print(gradients)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
#optimizer.apply_gradients(zip(gradients[1], model.trainable_variables))
return outputs
for epoch in range(50):
for xx, yy, zz in ds: # what to do with zz, the categorical target?
outs = train_step(xx, [yy,zz])
res1 = acc.update_state(zz, np.argmax(outs[1], axis=1))
res2 = loss.update_state(yy, outs[0])
template = 'Epoch {:>2}, Accuracy: {:>5.2f}, MAE: {:>5.2f}'
print(template.format(epoch+1, acc.result(), loss.result()))
acc.reset_states()
loss.reset_states()
Upvotes: 4