AndAgio
AndAgio

Reputation: 11

Error using Keras ImageDataGenerator with custom train_step in subclassed model

I am trying to code a knowledge distillation model using keras. I started from keras example here. In order to train the model the train_step and test_step method were overwritten. Unlike keras example, I want to fit the model using an ImageDataGenerator object to preprocess the images in the CIFAR10 dataset. The problem is that whenever I call the model.fit function passing X_train and Y_train the training works fine, if instead I call model.fit passing the ImageDataGenerator.flow(X_train, Y_train, batch_size) the code returns the following error:

NotImplementedError: When subclassing the Model class, you should implement a call method.

I have also tried to modify the way that train_step handles the data input that it receives, but it seems that no approach has worked so far.

Why is that so? Is there any issue with overwriting the train_step method of the Model class with ImageDataGenereator objects? Should the fit method of the class Model be overwritten too?

To make things clear and reproducible here is the sample code:

import time
import copy
import tensorflow as tf
import keras
from keras import regularizers
from keras.engine import Model
from keras.layers import Dropout, Flatten, Dense, Conv2D, MaxPooling2D, Activation, BatchNormalization
from keras.models import Sequential
from keras.datasets import cifar10
from keras.preprocessing.image import ImageDataGenerator
from keras.utils import np_utils
from tensorflow.python.keras.engine import data_adapter
# Imported from files
import settings_parser
from utils import progressive_learning_rate
from teacher import Teacher, build_teacher
from student import Student, build_student


class Distiller(tf.keras.Model):
    def __init__(self, student, teacher):
        super(Distiller, self).__init__()
        self.teacher = teacher
        self.student = student


    def compile(self, optimizer, metrics, student_loss_fn, distillation_loss_fn, alpha=0.1, temperature=3):
        """ Configure the distiller.
        Args:
            optimizer: Keras optimizer for the student weights
            metrics: Keras metrics for evaluation
            student_loss_fn: Loss function of difference between student
                predictions and ground-truth
            distillation_loss_fn: Loss function of difference between soft
                student predictions and soft teacher predictions
            alpha: weight to student_loss_fn and 1-alpha to distillation_loss_fn
            temperature: Temperature for softening probability distributions.
                Larger temperature gives softer distributions.
        """
        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    # @tf.function
    def train_step(self, data):
        # Treat data in different ways if it is a tuple or an iterator
        x = None
        y = None
        if isinstance(data, tuple):
            x, y = data
        if isinstance(data, tf.keras.preprocessing.image.NumpyArrayIterator):
            x, y = data.next()
        # Forward pass of teacher
        teacher_predictions = self.teacher(x, training=False)

        with tf.GradientTape() as tape:
            # Forward pass of student
            student_predictions = self.student(x, training=True)
            # Compute losses
            student_loss = self.student_loss_fn(y, student_predictions)
            distillation_loss = self.distillation_loss_fn(
                tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
                tf.nn.softmax(student_predictions / self.temperature, axis=1),
            )
            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss
        # Compute gradients
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update the metrics configured in `compile()`.
        self.compiled_metrics.update_state(y, student_predictions)
        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    # @tf.function
    def test_step(self, data):
        # Treat data in different ways if it is a tuple or an iterator
        x = None
        y = None
        if isinstance(data, tuple):
            x, y = data
        if isinstance(data, tf.keras.preprocessing.image.NumpyArrayIterator):
            x, y = data.next()

        # Compute predictions
        y_prediction = self.student(x, training=False)
        # Calculate the loss
        student_loss = self.student_loss_fn(y, y_prediction)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_prediction)
        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})

        return results


#Define method to build the teacher model (VGG16)
def build_teacher():
    input = keras.Input(shape=(32, 32, 3), name="img")
    x = Conv2D(64, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(input)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.3)(x)
    # Block 2
    x = Conv2D(64, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    # Block 3
    x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)
    # Block 4
    x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    # Block 5
    x = Conv2D(256, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)
    # Block 6
    x = Conv2D(256, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)
    # Block 7
    x = Conv2D(256, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    # Block 8
    x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)
    # Block 9
    x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)
    # Block 10
    x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    # Block 11
    x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)
    # Block 12
    x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)
    # Block 13
    x = Conv2D(512, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.5)(x)
    # Flatten and classification
    x = Flatten()(x)
    x = Dense(512)(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    # Out
    x = Dense(10)(x)
    output = Activation('softmax')(x)
    # Define model from input and output
    model = keras.Model(input, output, name="teacher")
    print(model.summary())

    return model


#Define method to build the teacher model (VGG16)
def build_student():
    input = keras.Input(shape=(32, 32, 3), name="img")
    x = Conv2D(64, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(input)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.3)(x)
    # Block 2
    x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    # Block 3
    x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)
    # Block 4
    x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    # Block 5
    x = Conv2D(256, (3, 3), padding='same', kernel_regularizer=regularizers.l2(0.0005))(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.4)(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    # Flatten and classification
    x = Flatten()(x)
    x = Dense(512)(x)
    x = Activation('relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    # Out
    x = Dense(10)(x)
    output = Activation('softmax')(x)
    # Define model from input and output
    model = keras.Model(input, output, name="student")
    print(model.summary())

    return model

if __name__ == '__main__':
    args = settings_parser.arg_parse()
    print_during_epochs = True
    student = build_student()
    student_clone = build_student()
    student_clone.set_weights(student.get_weights())
    teacher = build_teacher()

    (X_train, y_train), (X_test, y_test) = cifar10.load_data()
    X_train = X_train.astype('float32')
    X_test = X_test.astype('float32')
    Y_train = np_utils.to_categorical(y_train, 10)
    Y_test = np_utils.to_categorical(y_test, 10)
    train_datagen = ImageDataGenerator(
        rescale=1. / 255,  # rescale input image
        featurewise_center=False,  # set input mean to 0 over the dataset
        samplewise_center=False,  # set each sample mean to 0
        featurewise_std_normalization=False,  # divide inputs by std of the dataset
        samplewise_std_normalization=False,  # divide each input by its std
        zca_whitening=False,  # apply ZCA whitening
        rotation_range=15,  # randomly rotate images in the range (degrees, 0 to 180)
        width_shift_range=0.1,  # randomly shift images horizontally (fraction of total width)
        height_shift_range=0.1,  # randomly shift images vertically (fraction of total height)
        horizontal_flip=True,  # randomly flip images
        vertical_flip=False)  # randomly flip images)

    train_datagen.fit(X_train)
    train_generator = train_datagen.flow(X_train, Y_train, batch_size=64)

    test_datagen = ImageDataGenerator(rescale=1. / 255)
    test_generator = test_datagen.flow(X_test, Y_test, batch_size=64)

    # Train teacher as usual
    teacher.compile(optimizer=keras.optimizers.SGD(),
                    loss=keras.losses.categorical_crossentropy,
                    metrics=['accuracy'])

    # Train and evaluate teacher on data.
    teacher.fit(train_generator, validation_data=test_generator, epochs=5, verbose=print_during_epochs)
    loss, acc = teacher.evaluate(test_generator)
    print("Teacher model, accuracy: {:5.2f}%".format(100 * acc))

    # Train student as doen usually
    student_clone.compile(optimizer=keras.optimizers.SGD(),
                          loss=keras.losses.categorical_crossentropy,
                          metrics=['accuracy'])
    # Train and evaluate student trained from scratch.
    student_clone.fit(train_generator, validation_data=test_generator, epochs=5, verbose=print_during_epochs)
    loss, acc = student_clone.evaluate(test_generator)
    print("Student scratch model, accuracy: {:5.2f}%".format(100 * acc))

    #print('{}\n\n{}'.format(teacher.summary(), student_clone.summary()))

    # Train student using knowledge distillation
    distiller = Distiller(student=student, teacher=teacher)
    distiller.compile(optimizer=keras.optimizers.SGD(),
                      metrics=['accuracy'],
                      student_loss_fn=keras.losses.CategoricalCrossentropy(),  # categorical_crossentropy,
                      distillation_loss_fn=keras.losses.KLDivergence(),
                      alpha=0.1,
                      temperature=10)
    # Distill teacher to student

    distiller.fit(X_train, Y_train, epochs=5) #THIS WORKS FINE
    distiller.fit(train_generator, validation_data=test_generator, epochs=5,
                  verbose=print_during_epochs)  # THIS DOESN'T WORK



    # Evaluate student on test dataset
    loss, acc = distiller.evaluate(test_generator)
    print("Student distilled model, accuracy: {:5.2f}%".format(100 * acc))

Upvotes: 1

Views: 1088

Answers (2)

Hammad Tariq
Hammad Tariq

Reputation: 1

By Adding call() Method and enabling eager execution in compile() method, this problem resolves.

i.e. Add this line in compile Method super(GAN, self).compile(run_eagerly=True)

Please following example code in which this problem is resolved.

############   Customize what happens in Model.fit and Building GAN Like Structure #############


from tensorflow import keras
from keras.layers import Dense, Flatten, Reshape, Input, InputLayer, Conv2D, MaxPooling2D, ZeroPadding2D, UpSampling3D, MaxPooling3D, UpSampling2D, Conv3D, Conv2DTranspose
from keras.models import Sequential, Model
import numpy as np
import tensorflow as tf
from keras import layers
from keras import backend as K


# ## Design model

def upsample(filters, size, norm_type='batchnorm', apply_dropout=False):
  """Upsamples an input.
  Conv2DTranspose => Batchnorm => Dropout => Relu
  Args:
    filters: number of filters
    size: filter size
    norm_type: Normalization type; either 'batchnorm' or 'instancenorm'.
    apply_dropout: If True, adds the dropout layer
  Returns:
    Upsample Sequential Model
  """

  initializer = tf.random_normal_initializer(0., 0.02)

  result = tf.keras.Sequential()
  result.add(
      tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                      padding='same',
                                      kernel_initializer=initializer,
                                      use_bias=False))

  if norm_type.lower() == 'batchnorm':
    result.add(tf.keras.layers.BatchNormalization())
  elif norm_type.lower() == 'instancenorm':
    result.add(InstanceNormalization())

  if apply_dropout:
    result.add(tf.keras.layers.Dropout(0.5))

  result.add(tf.keras.layers.ReLU())

  return result


# Create the generator

norm_type = 'batchnorm'

generator_resnet_model = tf.keras.applications.ResNet50V2(
    include_top=False,
    weights=None,
    input_tensor=None,
    input_shape=(224,224,3),
    pooling=max,
)


Hog_Feature_Vector_generator = keras.Sequential(
    [
        keras.Input( shape=[224,224,3] ),
        generator_resnet_model,
        upsample(1024, 3, norm_type, apply_dropout=True),
        upsample(512, 3, norm_type, apply_dropout=True),
        upsample(256, 3, norm_type, apply_dropout=True),
        Conv2D(128, 3, strides=(1, 1), padding='same', activation='relu'),
        Conv2D(64, 3, strides=(1, 1), padding='same', activation='relu'),
        Conv2D(32, 3, strides=(1, 1), padding='same', activation='relu'),
        Conv2D(16, 3, strides=(1, 1), padding='same', activation='relu'),
        Conv2D(9, 3, strides=(1, 1), padding='valid', activation='relu'),
        layers.Reshape((26244,)),
    ],
    name="Hog_Feature_Vector_generator",
)

print(Hog_Feature_Vector_generator.summary())

# Create 2 Discriminators 

classifier_discriminator_resnet_model = tf.keras.applications.ResNet50V2(
    include_top=True,
    weights=None,
    input_tensor=None,
    input_shape=(216,216,3),
    pooling=max,
    classes=4,
    classifier_activation=None,
)


Hog_Feature_Vector_Classifier = keras.Sequential(
    [
        keras.Input(shape=[26244,]),
        layers.Reshape((54,54,9)),
        upsample(18, 3, norm_type, apply_dropout=True),
        upsample(36, 3, norm_type, apply_dropout=True),
        Conv2D(18, 3, strides=(1, 1), padding='same', activation='relu'),
        Conv2D(3, 3, strides=(1, 1), padding='same', activation='relu'),
        classifier_discriminator_resnet_model,
    ],
    name="Hog_Feature_Vector_Classifier",
)

print(Hog_Feature_Vector_Classifier.summary())

real_vs_fake_discriminator_resnet_model = tf.keras.applications.ResNet50V2(
    include_top=True,
    weights=None,
    input_tensor=None,
    input_shape=(216,216,3),
    pooling=max,
    classes=1,
    classifier_activation="softmax",
)


real_vs_fake_Hog_Feature_Vector_Classifier = keras.Sequential(
    [
        keras.Input(shape=[26244,]),
        layers.Reshape((54,54,9)),
        upsample(18, 3, norm_type, apply_dropout=True),
        upsample(36, 3, norm_type, apply_dropout=True),
        Conv2D(18, 3, strides=(1, 1), padding='same', activation='relu'),
        Conv2D(3, 3, strides=(1, 1), padding='same', activation='relu'),
        real_vs_fake_discriminator_resnet_model,
    ],
    name="real_vs_fake_Hog_Feature_Vector_Classifier",
)

print(real_vs_fake_Hog_Feature_Vector_Classifier.summary())


class GAN(keras.Model):
  def __init__(self, discriminator_classifier, discriminator_fake_vs_real, generator):
    super(GAN, self).__init__()
    self.discriminator_classifier = discriminator_classifier
    self.discriminator_fake_vs_real = discriminator_fake_vs_real
    self.generator = generator

  def call(self, input):
    private_fvs = self.generator(input)
    dec1_output = self.discriminator_classifier(private_fvs)
    dec2_output = self.discriminator_fake_vs_real(private_fvs)

    return private_fvs, dec1_output, dec2_output

  def compile(self, d1_optimizer, d2_optimizer, g_optimizer, loss_fn_d1, loss_fn_d2):
    super(GAN, self).compile(run_eagerly=True)
    self.d1_optimizer = d1_optimizer
    self.d2_optimizer = d2_optimizer
    self.g_optimizer = g_optimizer
    self.loss_fn_d1 = loss_fn_d1
    self.loss_fn_d2 = loss_fn_d2

  def train_step(self, data):
    print(f"Eager execution mode: {tf.executing_eagerly()}")
    # inp, trainLabels = data
    # trainImages, trainFVs = inp
    trainImages, trainFVs, trainLabels = data

    ## Inversing Labels for defining Not a Classidier #####
    ones_array = np.ones( tf.shape (trainLabels) ) 
    inverse_labels = ones_array - trainLabels
    
    batch_size = tf.shape(trainImages)[0]
    
    # Generate Private HoG Feature Vectors  
    generated_hog_fds = self.generator(trainImages)
    
    # Combine them with real Feature Vectors
    combined_features = tf.concat([generated_hog_fds, trainFVs], axis=0)
    
    # Assemble labels discriminating real from fake Feature Vectors 
    labels = tf.concat([tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0)
    
    # Add random noise to the labels - important trick!
    labels += 0.05 * tf.random.uniform(tf.shape(labels))
    
    # Train the Fake vs Real discriminator / d2
    with tf.GradientTape() as tape:
      predictions = self.discriminator_fake_vs_real(combined_features)
      d2_loss = self.loss_fn_d2(labels, predictions)
    grads = tape.gradient(d2_loss, self.discriminator_fake_vs_real.trainable_weights)
    self.d2_optimizer.apply_gradients(
        zip(grads, self.discriminator_fake_vs_real.trainable_weights)
          )
      
    # Train the discriminator Classifier / d1
    with tf.GradientTape() as tape:
      predictions = self.discriminator_classifier(generated_hog_fds)
      d1_loss = self.loss_fn_d1(inverse_labels, predictions)
      
    grads = tape.gradient(d1_loss, self.discriminator_classifier.trainable_weights)
    self.d1_optimizer.apply_gradients(
        zip(grads, self.discriminator_classifier.trainable_weights)
        )
    
    # Assemble labels that say "all real images"
    misleading_labels = tf.zeros((batch_size, 1))
    
    # Train the generator with computing loss from both discriminators (note that we should *not* update the weights
    # of the discriminator)!
    with tf.GradientTape() as tape:
      predictions1 = self.discriminator_classifier(self.generator(trainImages))
      predictions2 = self.discriminator_fake_vs_real(self.generator(trainImages))
      g_loss_d1 = self.loss_fn_d1(inverse_labels, predictions1)
      g_loss_d2 = self.loss_fn_d2(misleading_labels, predictions2)
      g_loss = g_loss_d1 + g_loss_d2   
    
    grads = tape.gradient(g_loss, self.generator.trainable_weights)
    self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
    
    return {"d1_loss": d1_loss, "d2_loss": d2_loss, "g_loss": g_loss}


gan = GAN(discriminator_classifier=Hog_Feature_Vector_Classifier,
          discriminator_fake_vs_real = real_vs_fake_Hog_Feature_Vector_Classifier,
          generator=Hog_Feature_Vector_generator
          )

gan.compile(
    d1_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    d2_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn_d1= keras.losses.categorical_crossentropy,
    loss_fn_d2= keras.losses.BinaryCrossentropy(from_logits= True)
)


gan.fit(training_generator, epochs=2,
        verbose = 2)

Upvotes: 0

Yoan B. M.Sc
Yoan B. M.Sc

Reputation: 1505

Try adding a call method to your Distiller class. Here is an example on a autoencoder using a subclass of the keras.Model as well :

class LSTM_Detector(Model):
  def __init__(self, flight_len, param_len):
    super(LSTM_Detector, self).__init__()
    self.input_dim = (flight_len, param_len)
    self.encoder = tf.keras.Sequential([
      layers.LSTM(16,
                  return_sequences=True,
                  activation="relu",
                  input_shape=self.input_dim),
      ])
    
    self.decoder = tf.keras.Sequential([
      layers.LSTM(16,
                  return_sequences=True,
                  activation="relu"),
      layers.TimeDistributed(layers.Dense(self.input_dim[1]))
      ])
    
  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

Upvotes: 0

Related Questions