Craig
Craig

Reputation: 177

How to print out data that goes to keras model.fit , specifically if using petastorm dataset

Update

While I appreciated AloneTogether's answer, I didn't like that I was using take() and it was separate from model.fit.

I put another answer here if you want to look at it. It involves subclassing Model. It's not too bad.

End of Update

I have a simple example, a parquet file with 8 columns named feature_# populated with 1 to 100 for each column

    feature_1      feature_2     ...      feature_8
    1              1                      1
    2              2                      2
    ...            ...                    ...
    99             99                     99
    100            100                    100

my model:

all_cols = ["feature_1","feature_2","feature_3","feature_4","feature_5","feature_6","feature_7","feature_8"]
x_cols = ["feature_1","feature_2","feature_3","feature_4","feature_5","feature_6","feature_7"]


inputs = [Input(shape=(1,),name=col) for col in x_cols]
merged = Concatenate(axis=1)(inputs)
x = Dense(50, activation="relu")(merged)
x = Dense(20,activation="relu")(x)
outputs = Dense(101,activation="softmax")(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
opt = tf.keras.optimizers.Adam(learning_rate=.001)

model.compile(loss="sparse_categorical_crossentropy",\
                      optimizer=opt,metrics=['accuracy'])

I use petastorm like so:

batch_size = 4

with make_batch_reader('%s/df_100.parquet' % data_dir, num_epochs=1,
                                   schema_fields=all_cols) as train_reader:
    with make_batch_reader('%s/df_100.parquet' % data_dir, num_epochs=1,
                                       schema_fields=all_cols) as val_reader:
train_ds = make_petastorm_dataset(train_reader) \
                        .unbatch() \
                        .map( 
                        lambda x: (tuple(getattr(x, col) for col in x_cols),getattr(x,"feature_8"))
                        ) \
                        .batch(batch_size) 
                                            

        val_ds = make_petastorm_dataset(val_reader) \
                        .unbatch() \
                        .map(
                        lambda x: (tuple(getattr(x, col) for col in x_cols), 
                                    getattr(x,"feature_8"))
                        ) \
                        .batch(batch_size) 

For this simple example I use the same data for train as validation. I want to confirm that the whole dataset is going to the model.fit() So I write a Custom Callback

class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, train_data):
    self.mylist = []
    self.train_data = train_data

  def on_train_batch_begin(self, batch, logs=None):
    print(list(self.train_data.take(1).as_numpy_iterator())[0][0][0])

# and I pass the dataset to the custom callback:
callbacks.append(MyCustomCallback(train_ds))

doesn't print all the values...1 to 100. If I iterate over the dataset (simple for loop) without a model.fit then I do get all 1 to 100, so I think the take() is competing with the model.fit, just a theory.

I have also tried:

class MyCustomCallback(tf.keras.callbacks.Callback):

  def on_train_batch_begin(self, batch, logs=None):
    print(self.model.layers[0].input) # or .output
    #or
    #print(self.model.layers[0].get_weights())


But this doesn't get me any real values and get_weights() prints out empty arrays

this is what printing input prints out:

KerasTensor(type_spec=TensorSpec(shape=(None, 1), dtype=tf.float32, name='feature_1'), name='feature_1', description="created by layer 'feature_1'")

I have tried using K.eval() on the input and output of the layer as well and that ends up with a numpy problem that is not fixed by any of the eager settings.

I really don't think this should be so hard. I just want to peak at the dataset before it goes into training.

I have fooled around with repeat(), cache(), and simply iterating over the dataset before the model.fit but I don't like the idea that this happens before the model.fit and that unless it is cached it reshuffles it, etc...

But I also want to be able to arbitrarily look at the model, any value, any weight, at any time. I don't feel like I can access this stuff, but feel like I should be able to.

Any help is appreciated.

oh, and using tensorflow 2.6.2 atm with tf.keras

Upvotes: 2

Views: 2834

Answers (2)

Craig
Craig

Reputation: 177

So this is my own answer after some trial and error. Hope this helps you, cause I couldn't find the answer easily.

First subclass a model

class CustomModel(tf.keras.Model):

    #normally wouldn't have to define __init__ but creating a variable "mylist"

    def __init__(self,inputs,outputs):
        super().__init__(inputs,outputs)
        self.mylist = []   
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data
        self.mylist.append(x[0].numpy())  # <<----- Everything here is standard except this

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

then make sure you use "run_eagerly=True" in model.compile()

inputs = [Input(shape=(1,),name=col) for col in x_cols]
merged = Concatenate(axis=1)(inputs)
x = Dense(50, activation="relu")(merged)
x = Dense(20,activation="relu")(x)
outputs = Dense(101,activation="softmax")(x)
model = CustomModel(inputs=inputs, outputs=outputs)  # <<--- use custom model
opt = tf.keras.optimizers.Adam(learning_rate=.001)

#notice the run_eagerly, this must be done for keras(not just tensorflow)
#to process things like python would
model.compile(loss="sparse_categorical_crossentropy",\
                      optimizer=opt,metrics=['accuracy'],run_eagerly=True)

then finally do some stuff in a custom callback

class MyCustomCallback(tf.keras.callbacks.Callback):
   
  def on_epoch_end(self, epoch, logs=None):
    #I'm sure this could be written better but I got a listwrapper of 
    #np.ndarrays to be a normal list of arrays
    mylist = [item.tolist() for item in list(self.model.mylist)]
    

    #and then flatten the list to sort them
    # remember to import itertools
    flat_list = list(itertools.chain(*mylist))
    flat_list.sort()

    # if these are equal then we have 1-100 in our input
    print(list(range(1,101))==flat_list)
    # or just print the list out of course
    print(flat_list)

    # and finally remember to reset the model's mylist after the epoch
    self.model.mylist = []

Upvotes: 0

AloneTogether
AloneTogether

Reputation: 26708

I think it all depends on the size of your batch_size because take(1) takes one batch and if the batch_size is < 100 you will not see all the values. If, for example, you have batch_size=100, then you will definitely see the values 1 to 100:

import pandas as pd
import tensorflow as tf
import numpy as np
from petastorm.tf_utils import make_petastorm_dataset
from petastorm.reader import make_batch_reader

df = pd.DataFrame({'feature1':np.arange(1, 101), 
              'feature2':np.arange(1, 101),
              'feature3':np.arange(1, 101),
              'feature4':np.arange(1, 101),
              'feature5':np.arange(1, 101),
              'feature6':np.arange(1, 101),
              'feature7':np.arange(1, 101),
              'feature8':np.arange(1, 101)})
columns = list(df)
df.to_parquet('file.parquet')
x_cols = columns[:-1]
batch_size = 100

class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, train_data):
    self.mylist = []
    self.train_data = train_data

  def on_train_batch_begin(self, batch, logs=None):
    tf.print(list(self.train_data.take(1).as_numpy_iterator())[0][0][0])


with make_batch_reader('file:///content/file.parquet', num_epochs=1,
                                   schema_fields=columns) as train_reader:
  train_ds = make_petastorm_dataset(train_reader) \
                        .unbatch() \
                        .map( 
                        lambda x: (tuple(getattr(x, col) for col in x_cols),getattr(x,"feature8"))
                        ) \
                        .shuffle(buffer_size=1000).batch(batch_size)
                        
  inputs = [tf.keras.layers.Input(shape=(1,),name=col) for col in x_cols]
  merged = tf.keras.layers.Concatenate(axis=1)(inputs)
  x = tf.keras.layers.Dense(50, activation="relu")(merged)
  x = tf.keras.layers.Dense(20,activation="relu")(x)
  outputs = tf.keras.layers.Dense(101, activation="softmax")(x)
  model = tf.keras.Model(inputs=inputs, outputs=outputs)
  opt = tf.keras.optimizers.Adam(learning_rate=.001)

  model.compile(loss="sparse_categorical_crossentropy", optimizer=opt,metrics=['accuracy'])
  model.fit(train_ds, epochs=2, callbacks=[MyCustomCallback(train_ds)])
Epoch 1/2
array([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
        14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,
        27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,
        40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,  52,
        53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,  65,
        66,  67,  68,  69,  70,  71,  72,  73,  74,  75,  76,  77,  78,
        79,  80,  81,  82,  83,  84,  85,  86,  87,  88,  89,  90,  91,
        92,  93,  94,  95,  96,  97,  98,  99, 100])
      1/Unknown - 1s 777ms/step - loss: 19.3339 - accuracy: 0.0100array([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
        14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,
        27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,
        40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,  52,
        53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,  65,
        66,  67,  68,  69,  70,  71,  72,  73,  74,  75,  76,  77,  78,
        79,  80,  81,  82,  83,  84,  85,  86,  87,  88,  89,  90,  91,
        92,  93,  94,  95,  96,  97,  98,  99, 100])
1/1 [==============================] - 1s 899ms/step - loss: 19.3339 - accuracy: 0.0100
...

Also, I am not sure what exactly the benefits of petastorm are, but if you are looking for an alternative, you could try tensorflow-io:

import pandas as pd
import tensorflow_io as tfio
import tensorflow as tf
import numpy as np

df = pd.DataFrame({'feature1':np.arange(1, 101), 
              'feature2':np.arange(1, 101),
              'feature3':np.arange(1, 101),
              'feature4':np.arange(1, 101),
              'feature5':np.arange(1, 101),
              'feature6':np.arange(1, 101),
              'feature7':np.arange(1, 101),
              'feature8':np.arange(1, 101)})
columns = list(df)
df.to_parquet('file.parquet')
ds = tfio.IODataset.from_parquet('file.parquet', columns = columns)
x_cols = columns[:-1]
batch_size = 100

train_ds = ds.map(lambda x: (tuple(x[col] for col in x_cols),x["feature8"])).shuffle(buffer_size=1000).batch(batch_size)
inputs = [tf.keras.layers.Input(shape=(1,),name=col) for col in x_cols]
merged = tf.keras.layers.Concatenate(axis=1)(inputs)
x = tf.keras.layers.Dense(50, activation="relu")(merged)
x = tf.keras.layers.Dense(20,activation="relu")(x)
outputs = tf.keras.layers.Dense(101, activation="softmax")(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
opt = tf.keras.optimizers.Adam(learning_rate=.001)

model.compile(loss="sparse_categorical_crossentropy", optimizer=opt,metrics=['accuracy'])
model.fit(train_ds, epochs=2, callbacks=[MyCustomCallback(train_ds)])

Update 1: You can add each batch to an array in the Callback and at the end of each epoch, you could print the values and reset the array for the next epoch:

class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, train_data):
    self.mylist = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True, infer_shape=True)
    self.train_data = train_data

  def on_batch_end(self, batch, logs=None):
    self.mylist = self.mylist.write(self.mylist.size(), list(self.train_data.take(1).as_numpy_iterator())[0][0][0])
  
  def on_epoch_end(self, epoch, logs=None):
    arr = self.mylist.stack()
    tf.print(arr, summarize=-1)
    self.mylist = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True, infer_shape=True)

Upvotes: 2

Related Questions