ellek
ellek

Reputation: 165

Applying Normalization to Inputs in Tensorflow

I have created a custom class to be an ML model, and it is working fine, but I would like to normalize the inputs as they have a wide range of values (e.g. 0, 20000, 500, 10, 8). Currently, as a way of normalizing the inputs, I'm applying lambda x: np.log(x + 1) to each input (the +1 is so it doesn't error out when 0 is passed in). Would a normalization layer be better than my current approach? If so, how would I go about implementing it? My code for the model is below:

class FollowModel:
    def __init__(self, input_shape, output_shape, hidden_layers, input_labels, learning_rate=0.001):
        tf.reset_default_graph()
        assert len(input_labels) == input_shape[1], 'Incorrect number of input labels!'

        # Placeholders for input and output data
        self.input_labels = input_labels
        self.input_shape = input_shape
        self.output_shape = output_shape
        self.X = tf.placeholder(shape=input_shape, dtype=tf.float64, name='X')
        self.y = tf.placeholder(shape=output_shape, dtype=tf.float64, name='y')
        self.hidden_layers = hidden_layers
        self.learning_rate = learning_rate

        # Variables for two group of weights between the three layers of the network
        self.W1 = tf.Variable(np.random.rand(input_shape[1], hidden_layers), dtype=tf.float64)
        self.W2 = tf.Variable(np.random.rand(hidden_layers, output_shape[1]), dtype=tf.float64)

        # Create the neural net graph
        self.A1 = tf.sigmoid(tf.matmul(self.X, self.W1))
        self.y_est = tf.sigmoid(tf.matmul(self.A1, self.W2))

        # Define a loss function
        self.deltas = tf.square(self.y_est - self.y)  # want this to be 0
        self.loss = tf.reduce_sum(self.deltas)

        # Define a train operation to minimize the loss
        self.optimizer = tf.train.AdamOptimizer(learning_rate).minimize(self.loss)

        #initialize
        self.model_init = tf.global_variables_initializer()
        self.trained = False


    def train(self, Xtrain, ytrain, Xtest, ytest, training_steps, batch_size, print_progress=True):
        #intiialize session
        self.trained = True
        self.training_steps = training_steps
        self.batch_size = batch_size
        self.sess = tf.Session()
        self.sess.run(self.model_init)
        self.losses = []
        self.accs = []
        self.testing_accuracies = []

        for i in range(training_steps*batch_size):
            self.sess.run(self.optimizer, feed_dict={self.X: Xtrain, self.y: ytrain})
            local_loss = self.sess.run(self.loss, feed_dict={self.X: Xtrain.values, self.y: ytrain.values})
            self.losses.append(local_loss)
            self.weights1 = self.sess.run(self.W1)
            self.weights2 = self.sess.run(self.W2)

            y_est_np = self.sess.run(self.y_est, feed_dict={self.X: Xtrain.values, self.y: ytrain.values})
            correct = [estimate.argmax(axis=0) == target.argmax(axis=0)
                       for estimate, target in zip(y_est_np, ytrain.values)]
            acc = 100 * sum(correct) / len(correct)
            self.accs.append(acc)

            if i % batch_size == 0:
                batch_num = i / batch_size
                if batch_num % 5 == 0:
                    self.testing_accuracies.append(self.test_accuracy(Xtest, ytest, False, True))
                temp_table = pd.concat([Xtrain, ytrain], axis=1).sample(frac=1)
                column_names = list(temp_table.columns.values)
                X_columns, y_columns = column_names[0:len(column_names) - 2], column_names[len(column_names) - 2:]
                Xtrain = temp_table[X_columns]
                ytrain = temp_table[y_columns]
                if print_progress: print('Step: %d, Accuracy: %.2f, Loss: %.2f' % (int(i/batch_size), acc, local_loss))

        if print_progress: print("Training complete!\nloss: {}, hidden nodes: {}, steps: {}, epoch size: {}, total steps: {}".format(int(self.losses[-1]*100)/100, self.hidden_layers, training_steps, batch_size, training_steps*batch_size))
        self.follow_accuracy = acc
        return acc


    def test_accuracy(self, Xtest, ytest, print_progress=True, return_accuracy=False):
        if self.trained:
            X = tf.placeholder(shape=Xtest.shape, dtype=tf.float64, name='X')
            y = tf.placeholder(shape=ytest.shape, dtype=tf.float64, name='y')
            W1 = tf.Variable(self.weights1)
            W2 = tf.Variable(self.weights2)
            A1 = tf.sigmoid(tf.matmul(X, W1))
            y_est = tf.sigmoid(tf.matmul(A1, W2))

            # Calculate the predicted outputs
            init = tf.global_variables_initializer()
            with tf.Session() as sess:
                sess.run(init)
                y_est_np = sess.run(y_est, feed_dict={X: Xtest, y: ytest})

            correctly_followed = 0
            incorrectly_followed = 0
            missed_follows = 0
            correctly_skipped = 0

            for estimate, actual in zip(y_est_np, ytest.values):
                est = estimate.argmax(axis=0)
                # print(estimate)
                actual = actual.argmax(axis=0)
                if est == 1 and actual == 0: incorrectly_followed += 1
                elif est == 1 and actual == 1: correctly_followed += 1
                elif est == 0 and actual == 1: missed_follows += 1
                else: correctly_skipped += 1

            # correct = [estimate.argmax(axis=0) == target.argmax(axis=0) for estimate, target in zip(y_est_np, ytest.values)]
            total_followed = incorrectly_followed + correctly_followed

            total_correct = correctly_followed + correctly_skipped
            total_incorrect = incorrectly_followed + missed_follows

            try: total_accuracy = int(total_correct * 10000 / (total_correct + total_incorrect)) / 100
            except: total_accuracy = 0


            total_skipped = correctly_skipped + missed_follows
            try: follow_accuracy = int(correctly_followed * 10000 / total_followed) / 100
            except: follow_accuracy = 0
            try: skip_accuracy = int(correctly_skipped * 10000 / total_skipped) / 100
            except: skip_accuracy = 0

            if print_progress: print('Correctly followed {} / {} ({}%), correctly skipped {} / {} ({}%)'.format(
                correctly_followed, total_followed, follow_accuracy, correctly_skipped, total_skipped, skip_accuracy))

            self.follow_accuracy = follow_accuracy

            if return_accuracy:
                return total_accuracy

        else:
            print('The model is not trained!')


    def make_prediction_on_normal_data(self, input_list):
        assert len(input_list) == len(self.input_labels), 'Incorrect number of inputs (had {} should have {})'.format(len(input_list), len(self.input_labels))
        # from ProcessData import normalize_list
        # normalize_list(input_list)
        input_array = np.array([input_list])

        X = tf.placeholder(shape=(1, len(input_list)), dtype=tf.float64, name='X')
        y = tf.placeholder(shape=(1, 2), dtype=tf.float64, name='y')
        W1 = tf.Variable(self.weights1)
        W2 = tf.Variable(self.weights2)
        A1 = tf.sigmoid(tf.matmul(X, W1))
        y_est = tf.sigmoid(tf.matmul(A1, W2))

        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            y_est_np = sess.run(y_est, feed_dict={X: input_array, y: self.create_blank_outputs()})
            predicted_value = y_est_np[0].argmax(axis=0)
            return predicted_value


    def make_prediction_on_abnormal_data(self, input_list):
        from ProcessData import normalize_list
        normalize_list(input_list)
        return self.make_prediction_on_normal_data(input_list)


    def create_blank_outputs(self):
        blank_outputs = np.zeros(shape=(1,2), dtype=np.int)
        for i in range(len(blank_outputs[0])):
            blank_outputs[0][i] = float(blank_outputs[0][i])
        return blank_outputs

Upvotes: 1

Views: 1774

Answers (1)

ian
ian

Reputation: 399

I don't see see why you want to create a layer that does that. The common practice of preprocessing your inputs is as you are currently doing. Using the log operator is quite common for skewed data, but there are other preprocessing solutions such as sklearn's MinMaxScaler and StandardScaler

https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html

Those are just examples of two other ways to scale your data. There is such a thing called BatchNorm but it is not recommended as the first layer of the network as distribution of the data is fixed and doesn’t vary during training.

Upvotes: 3

Related Questions