Cici
Cici

Reputation: 23

How to convert a tensorflow model to a pytorch model?

I'm new to pytorch. Here's an architecture of a tensorflow model and I'd like to convert it into a pytorch model. enter image description here

I have done most of the codes but am confused about a few places.

1) In tensorflow, the Conv2D function takes filter as an input. However, in pytorch, the function takes the size of input channels and output channels as inputs. So how do I find the equivalent number of input channels and output channels, provided with the size of the filter.

2) In tensorflow, the dense layer has a parameter called 'nodes'. However, in pytorch, the same layer has 2 different inputs (the size of the input parameters and size of the targeted parameters), how do I determine them based on the number of the nodes.

Here's the tensorflow code.

from keras.utils import to_categorical
from keras.models import Sequential, load_model
from keras.layers import Conv2D, MaxPool2D, Dense, Flatten, Dropout

model = Sequential()
model.add(Conv2D(filters=32, kernel_size=(5,5), activation='relu', input_shape=X_train.shape[1:]))
model.add(Conv2D(filters=32, kernel_size=(5,5), activation='relu'))
model.add(MaxPool2D(pool_size=(2, 2)))
model.add(Dropout(rate=0.25))
model.add(Conv2D(filters=64, kernel_size=(3, 3), activation='relu'))
model.add(Conv2D(filters=64, kernel_size=(3, 3), activation='relu'))
model.add(MaxPool2D(pool_size=(2, 2)))
model.add(Dropout(rate=0.25))
model.add(Flatten())
model.add(Dense(256, activation='relu'))
model.add(Dropout(rate=0.5))
model.add(Dense(43, activation='softmax'))

Here's my code.:

import torch.nn.functional as F
import torch



# The network should inherit from the nn.Module
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # Define 2D convolution layers
        # 3: input channels, 32: output channels, 5: kernel size, 1: stride
        self.conv1 = nn.Conv2d(3, 32, 5, 1)   # The size of input channel is 3 because all images are coloured
        self.conv2 = nn.Conv2d(32, 64, 5, 1)
        self.conv3 = nn.Conv2d(64, 128, 3, 1)
        self.conv3 = nn.Conv2d(128, 256, 3, 1)
        # It will 'filter' out some of the input by the probability(assign zero)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        # Fully connected layer: input size, output size
        self.fc1 = nn.Linear(36864, 128)
        self.fc2 = nn.Linear(128, 10)

    # forward() link all layers together,
    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = self.conv3(x)
        x = F.relu(x)
        x = self.conv4(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

Thanks in advance!

Upvotes: 1

Views: 4281

Answers (2)

Qazi Fahim Farhan
Qazi Fahim Farhan

Reputation: 2186

I was trying to convert my tensorflow model into pytorch model. At first I had googled, and fould answers that use onnx. But I couldn't make it work. So I ended up doing this:

  1. Extend tensorflow layers, and override the def call() function to print logs. For example, LoggableDense like this:
timber = logging.getLogger()
logging.basicConfig(level=logging.INFO)


class LoggableDense(tf.keras.layers.Dense):
    def __init__(self, units: int, activation=None,
                 use_bias=True,
                 kernel_initializer="glorot_uniform",
                 kernel_regularizer=None,
                 id=-1
                 ):
        super().__init__(units, activation, use_bias, kernel_initializer, kernel_regularizer=kernel_regularizer)
        self.id = id
        pass

    def call(self, inputs):
        out = super().call(inputs)
        timber.info(f"{self.id} LMaxPooling1D: inputs.shape: {inputs.shape} inputs.value: {inputs}")
        timber.info(f"{self.id} LMaxPooling1D: inputs.shape: {out.shape} inputs.value: {out}---------------\n\n")
        return out

  1. Similarly, extend other layers. Now, replace the layers with loggable layers. In my case, the model looks like this:
def create_loggable_CNNBasic_model(sequence_shape, filters, kernel_size):
    # input layers
    forward_input = keras.Input(shape=(sequence_shape[1], sequence_shape[2]), name='forward')
    reverse_input = keras.Input(shape=(sequence_shape[1], sequence_shape[2]), name='reverse')

    first_layer = LoggableConv1d(filters=filters, kernel_size=kernel_size, activation="relu",
                                 input_shape=(sequence_shape[1], sequence_shape[2]), id="1")
    fw = first_layer(forward_input)
    bw = first_layer(reverse_input)
    print("after Conv1D fw shape----------------", fw)
    print("after Conv1D bw shape----------------", bw)
    concat = loggable_concatenate([fw, bw], axis=1)
    print("Concat shape-----------------", concat.shape)
    pool_size_input = concat.shape[1]

    pool_layer = LoggableMaxPooling1D(pool_size=pool_size_input, id="3")(concat)
    # model.add(pool_layer)
    print("After Maxpooling shape-----------------", pool_layer.shape)
    flat = LoggableFlatten(id="4")(pool_layer)
    # model.add(flat)

    after_flat = LoggableDense(32, activation="relu", id="5")(flat)
    # model.add(after_flat)

    outputs = LoggableDense(1, kernel_initializer='normal', kernel_regularizer=regularizers.l2(0.001),
                            activation='sigmoid', id="6", is_end=True)(after_flat)

    model = keras.Model(inputs=[forward_input, reverse_input], outputs=outputs)

    model.compile(loss='mean_squared_error', optimizer='adam', metrics=['accuracy'])
    return model
  1. Now run the model with a small input. this will print some logs in the console. your console should look like this:
INFO:root:------------start---------------

1 LConv1D: inputs.shape: (None, 2000, 4), inputs.value: Tensor("Placeholder:0", shape=(None, 2000, 4), dtype=float32)
INFO:root:1 LConv1D: outputs.shape: (None, 1991, 10), outputs.value: Tensor("loggable_conv1d/Relu:0", shape=(None, 1991, 10), dtype=float32)---------------


INFO:root:------------start---------------

1 LConv1D: inputs.shape: (None, 2000, 4), inputs.value: Tensor("Placeholder:0", shape=(None, 2000, 4), dtype=float32)
INFO:root:1 LConv1D: outputs.shape: (None, 1991, 10), outputs.value: Tensor("loggable_conv1d/Relu:0", shape=(None, 1991, 10), dtype=float32)---------------


after Conv1D fw shape---------------- KerasTensor(type_spec=TensorSpec(shape=(None, 1991, 10), dtype=tf.float32, name=None), name='loggable_conv1d/Relu:0', description="created by layer 'loggable_conv1d'")
after Conv1D bw shape---------------- KerasTensor(type_spec=TensorSpec(shape=(None, 1991, 10), dtype=tf.float32, name=None), name='loggable_conv1d/Relu:0', description="created by layer 'loggable_conv1d'")
INFO:root:2 LConcatenate: input.size 2, inputs.value: [<tf.Tensor 'Placeholder:0' shape=(None, 1991, 10) dtype=float32>, <tf.Tensor 'Placeholder_1:0' shape=(None, 1991, 10) dtype=float32>]
INFO:root:2 LConcatenate: outputs.shape: (None, 3982, 10), outputs.size= Tensor("loggable_concatenate/strided_slice:0", shape=(), dtype=int32) outputs.value: Tensor("loggable_concatenate/concat:0", shape=(None, 3982, 10), dtype=float32)---------------


Concat shape----------------- (None, 3982, 10)
INFO:root:3 LMaxPooling1D: inputs.shape: (None, 3982, 10), inputs.value: Tensor("Placeholder:0", shape=(None, 3982, 10), dtype=float32)
INFO:root:3 LMaxPooling1D: outputs.shape: (None, 1, 10), outputs.value: Tensor("loggable_max_pooling1d/Squeeze:0", shape=(None, 1, 10), dtype=float32)---------------


After Maxpooling shape----------------- (None, 1, 10)
INFO:root:4 LFlatten: inputs.shape: (None, 1, 10), inputs.value: Tensor("Placeholder:0", shape=(None, 1, 10), dtype=float32)
INFO:root:4 LFlatten: outputs.shape: (None, 10), outputs.value: Tensor("loggable_flatten/Reshape:0", shape=(None, 10), dtype=float32)---------------


INFO:root:5 LMaxPooling1D: inputs.shape: (None, 10) inputs.value: Tensor("Placeholder:0", shape=(None, 10), dtype=float32)
INFO:root:5 LMaxPooling1D: inputs.shape: (None, 32) inputs.value: Tensor("loggable_dense/Relu:0", shape=(None, 32), dtype=float32)---------------


INFO:root:6 LMaxPooling1D: inputs.shape: (None, 32) inputs.value: Tensor("Placeholder:0", shape=(None, 32), dtype=float32)
INFO:root:6 LMaxPooling1D: inputs.shape: (None, 1) inputs.value: Tensor("loggable_dense_1/Sigmoid:0", shape=(None, 1), dtype=float32)---------------


INFO:root:---------END---------
  1. As you can see, the logs show the shape of input vs output for each layer. Use these shapes as reference to build your pytorch model.

  2. Next I wrote a pytorch model, and ran on the same sample data. I got error in first time, such as matrix multiplication errors. I fixed those by tweaking in_channels, and out_channels. Keep tweaking until the input and output pytorch.tensors have same shape as in step 3. Finally I ended up with a model like this:

class CnnBasicPytorchModel(nn.Module):
    # property variables
    seq_len: int
    in_channel: int
    num_filters: int
    kernel_size: int


    def __init__(self, seq_len, in_channel=4, num_filters=10, kernel_size=10,  *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.seq_len = seq_len
        self.in_channel = in_channel
        self.num_filters = num_filters
        self.kernel_size = kernel_size

        # Init the layers
        self.conv1d_01 = nn.Conv1d(in_channels=self.in_channel, out_channels=self.num_filters,
                                   kernel_size=self.kernel_size)
        # l01: hf: shape: torch.Size([10, 32, 98])
        self.concat_02 = torch.concat
        # l02: h: shape: torch.Size([10, 64, 98])
        # self.relu_03 = nn.ReLU(inplace=True)
        # l03: h: shape: torch.Size([10, 64, 98])
        self.max_pooling_04 = nn.MaxPool1d(kernel_size=self.kernel_size)  # but why 2?
        # l04: h: shape: torch.Size([10, 64, 98])

        self.flatten_08 = nn.Flatten()
        # INFO: root:l08: h: shape: torch.Size([10, 48])

        self.dense_10 = nn.Linear(in_features= ((seq_len - kernel_size + 1) * 2 // 10) * 10, out_features=1024)
        self.activation_11 = nn.ReLU(inplace=True)

        self.output_dense_12 = nn.Linear(in_features=1024, out_features=1)
        self.output_activation_13 = nn.Sigmoid()

    def forward(self, xf, xb):
        hf = self.conv1d_01(xf)
        timber.debug(constants.green + f"l01 conv1d_01: hf: shape: {hf.shape}")
        hb = self.conv1d_01(xb)
        timber.debug(constants.green + f"l01 conv1d_01: hb: shape: {hb.shape}")
        h = self.concat_02(tensors=(hf, hb), dim=2)
        timber.debug(constants.green + f"l02 concat_02: h: shape: {h.shape}")
        # h = self.relu_03(h)
        # timber.debug(constants.green + f"l03 relu_03: h: shape: {h.shape}")
        h = self.max_pooling_04(h)
        timber.debug(constants.green + f"l04 max_pooling_04: h: shape: {h.shape}")

        h = self.flatten_08(h)
        timber.debug(constants.magenta + f"l08 flatten_08: h: shape: {h.shape}")
        h = self.dense_10(h)
        timber.debug(constants.green + f"l10 dense_10: h: shape: {h.shape}")
        h = self.activation_11(h)
        timber.debug(constants.green + f"l11 activation_11: h: shape: {h.shape}")
        h = self.output_dense_12(h)
        timber.debug(constants.green + f"l12 output_dense_12: h: shape: {h.shape}")
        h = self.output_activation_13(h)
        timber.debug(constants.green + f"l13 output_activation_13: h: shape: {h.shape}")
        return h

Here is my complete code:

import os

from keras.src import regularizers

os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

import tensorflow as tf
import tensorflow.keras as keras
import torch
import torch.nn as nn
import logging

timber = logging.getLogger()
logging.basicConfig(level=logging.INFO)

# class Constants:
black = "\u001b[30m"
red = "\u001b[31m"
green = "\u001b[32m"
yellow = "\u001b[33m"
blue = "\u001b[34m"
magenta = "\u001b[35m"
cyan = "\u001b[36m"
white = "\u001b[37m"

# Conv1D
# Relu
# MaxPooling1D
# TimeDistributed
# Flatten
# LSTM
# Bidirectional
# Dense
#  Lambda
# concatenate

class LoggableConcatenate(tf.keras.layers.Concatenate):
    def __init__(self, id, **kwargs):
        super().__init__(**kwargs)
        self.id = id

    def call(self, inputs):
        timber.info(
            green + f"{self.id} LConcatenate: input.size {len(inputs)}, inputs.value: {inputs}")
        out = super().call(inputs)
        timber.info(
            red + f"{self.id} LConcatenate: outputs.shape: {out.shape}, outputs.size= {len(out)} outputs.value: {out}---------------\n\n")
        return out


def loggable_concatenate(inputs, axis=-1, **kwargs):
    return LoggableConcatenate(axis=axis, id=2, **kwargs)(inputs)


class LoggableConv1d(tf.keras.layers.Conv1D):
    def __init__(self, filters, kernel_size, id, **kwargs):
        super().__init__(filters, kernel_size, **kwargs)
        self.id = id

    def call(self, inputs):
        timber.info(
            green + f"------------start---------------\n\n{self.id} LConv1D: inputs.shape: {inputs.shape}, inputs.value: {inputs}")
        out = super().call(inputs)
        timber.info(
            red + f"{self.id} LConv1D: outputs.shape: {out.shape}, outputs.value: {out}---------------\n\n")
        return out


class LoggableReLU(tf.keras.layers.ReLU):
    def __init__(self, id):
        super().__init__()
        self.id = id

    def call(self, inputs):
        timber.info(blue +
                    f"{self.id} LReLU: inputs.shape: {inputs.shape}, inputs.value: {inputs}")
        out = super().call(inputs)
        timber.info(magenta +
                    f"{self.id} LReLU: outputs.shape: {out.shape}, outputs.value: {out}---------------\n\n")
        return out


class LoggableMaxPooling1D(tf.keras.layers.MaxPooling1D):
    def __init__(self, id, **kwargs):
        super().__init__(**kwargs)
        self.id = id

    def call(self, inputs):
        timber.info(magenta +
                    f"{self.id} LMaxPooling1D: inputs.shape: {inputs.shape}, inputs.value: {inputs}")
        out = super().call(inputs)
        timber.info(cyan +
                    f"{self.id} LMaxPooling1D: outputs.shape: {out.shape}, outputs.value: {out}---------------\n\n")
        return out


class LoggableTimeDistributed(tf.keras.layers.TimeDistributed):
    def __init__(self, layer, id, **kwargs):
        super().__init__(layer, **kwargs)
        self.id = id

    def call(self, inputs, training=None, mask=None):
        out = super().call(inputs, training, mask)
        timber.info(yellow +
                    f"{self.id} LTimeDistributed: inputs.shape: {inputs.shape}, inputs.value: {inputs}")
        timber.info(red +
                    f"{self.id} LTimeDistributed: outputs.shape: {out.shape}, outputs.value: {out}---------------\n\n")
        return out


class LoggableFlatten(tf.keras.layers.Flatten):
    def __init__(self, id, **kwargs):
        super().__init__(**kwargs)
        self.id = id

    def call(self, inputs):
        out = super().call(inputs)
        timber.info(green +
                    f"{self.id} LFlatten: inputs.shape: {inputs.shape}, inputs.value: {inputs}")
        timber.info(magenta +
                    f"{self.id} LFlatten: outputs.shape: {out.shape}, outputs.value: {out}---------------\n\n")
        return out


class LoggableLSTM(tf.keras.layers.LSTM):
    def __init__(self, units, **kwargs):
        super().__init__(units, **kwargs)
        self.id = 10

    def call(self, inputs, mask=None, training=None, initial_state=None):
        out = super().call(inputs, mask, training, initial_state)
        timber.info(blue +
                    f"{self.id} LLSTM: inputs.shape: {inputs.shape}, inputs.value: {inputs}")
        timber.info(red +
                    f"{self.id} LLSTM: outputs.shape: {out.shape}, outputs.value: {out}---------------\n\n")
        return out


class LoggableBidirectional(tf.keras.layers.Bidirectional):
    def __init__(self, layer, id, **kwargs):
        super().__init__(layer, **kwargs)
        self.id = id

    def call(
            self,
            inputs,
            training=None,
            mask=None,
            initial_state=None,
            constants=None,
    ):
        out = super().call(inputs, training, mask, initial_state, constants)
        timber.info(magenta +
                    f"{self.id} LBidirectional: inputs.shape: {inputs.shape}, inputs.value: {inputs}")
        timber.info(cyan +
                    f"{self.id} LBidirectional: out.shape: {out.shape} outputs.value: {out}---------------\n\n")
        return out


class LoggableDense(tf.keras.layers.Dense):
    def __init__(self, units: int, activation=None,
                 use_bias=True,
                 kernel_initializer="glorot_uniform",
                 kernel_regularizer=None,
                 id=-1
                 ):
        super().__init__(units, activation, use_bias, kernel_initializer, kernel_regularizer=kernel_regularizer)
        self.id = id
        pass

    def call(self, inputs):
        out = super().call(inputs)
        timber.info(blue +
                    f"{self.id} LMaxPooling1D: inputs.shape: {inputs.shape} inputs.value: {inputs}")
        timber.info(green +
                    f"{self.id} LMaxPooling1D: inputs.shape: {out.shape} inputs.value: {out}---------------\n\n")
        return out


class LoggableLambda(tf.keras.layers.Lambda):
    def __init__(self, function, id, **kwargs):
        super().__init__(function, **kwargs)
        self.id = id

    def call(self, inputs, mask=None, training=None):
        out = super().call(inputs, mask, training)
        timber.info(cyan +
                    f"{self.id} LLambda: inputs.shape: {inputs.shape} inputs.value: {inputs}")
        timber.info(magenta +
                    f"{self.id} LLambda: outputs.shape: {out.shape} outputs.value: {out}---------------\n\n")
        return out

def create_loggable_CNNBasic_model(sequence_shape, filters, kernel_size):
    # input layers
    forward_input = keras.Input(shape=(sequence_shape[1], sequence_shape[2]), name='forward')
    reverse_input = keras.Input(shape=(sequence_shape[1], sequence_shape[2]), name='reverse')

    first_layer = LoggableConv1d(filters=filters, kernel_size=kernel_size, activation="relu",
                                 input_shape=(sequence_shape[1], sequence_shape[2]), id="1")
    fw = first_layer(forward_input)
    bw = first_layer(reverse_input)
    print("after Conv1D fw shape----------------", fw)
    print("after Conv1D bw shape----------------", bw)
    concat = loggable_concatenate([fw, bw], axis=1)
    print("Concat shape-----------------", concat.shape)
    pool_size_input = concat.shape[1]

    pool_layer = LoggableMaxPooling1D(pool_size=pool_size_input, id="3")(concat)
    # model.add(pool_layer)
    print("After Maxpooling shape-----------------", pool_layer.shape)
    flat = LoggableFlatten(id="4")(pool_layer)
    # model.add(flat)

    after_flat = LoggableDense(32, activation="relu", id="5")(flat)
    # model.add(after_flat)

    outputs = LoggableDense(1, kernel_initializer='normal', kernel_regularizer=regularizers.l2(0.001),
                            activation='sigmoid', id="6", is_end=True)(after_flat)

    model = keras.Model(inputs=[forward_input, reverse_input], outputs=outputs)

    model.compile(loss='mean_squared_error', optimizer='adam', metrics=['accuracy'])
    return model


class CnnBasicPytorchModel(nn.Module):
    # property variables
    seq_len: int
    in_channel: int
    num_filters: int
    kernel_size: int


    def __init__(self, seq_len, in_channel=4, num_filters=10, kernel_size=10,  *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.seq_len = seq_len
        self.in_channel = in_channel
        self.num_filters = num_filters
        self.kernel_size = kernel_size

        # Init the layers
        self.conv1d_01 = nn.Conv1d(in_channels=self.in_channel, out_channels=self.num_filters,
                                   kernel_size=self.kernel_size)
        # l01: hf: shape: torch.Size([10, 32, 98])
        self.concat_02 = torch.concat
        # l02: h: shape: torch.Size([10, 64, 98])
        # self.relu_03 = nn.ReLU(inplace=True)
        # l03: h: shape: torch.Size([10, 64, 98])
        self.max_pooling_04 = nn.MaxPool1d(kernel_size=self.kernel_size)  # but why 2?
        # l04: h: shape: torch.Size([10, 64, 98])

        self.flatten_08 = nn.Flatten()
        # INFO: root:l08: h: shape: torch.Size([10, 48])

        self.dense_10 = nn.Linear(in_features= ((seq_len - kernel_size + 1) * 2 // 10) * 10, out_features=1024)
        self.activation_11 = nn.ReLU(inplace=True)

        self.output_dense_12 = nn.Linear(in_features=1024, out_features=1)
        self.output_activation_13 = nn.Sigmoid()

    def forward(self, xf, xb):
        hf = self.conv1d_01(xf)
        timber.debug(green + f"l01 conv1d_01: hf: shape: {hf.shape}")
        hb = self.conv1d_01(xb)
        timber.debug(green + f"l01 conv1d_01: hb: shape: {hb.shape}")
        h = self.concat_02(tensors=(hf, hb), dim=2)
        timber.debug(green + f"l02 concat_02: h: shape: {h.shape}")
        # h = self.relu_03(h)
        # timber.debug(green + f"l03 relu_03: h: shape: {h.shape}")
        h = self.max_pooling_04(h)
        timber.debug(green + f"l04 max_pooling_04: h: shape: {h.shape}")

        h = self.flatten_08(h)
        timber.debug(magenta + f"l08 flatten_08: h: shape: {h.shape}")
        h = self.dense_10(h)
        timber.debug(green + f"l10 dense_10: h: shape: {h.shape}")
        h = self.activation_11(h)
        timber.debug(green + f"l11 activation_11: h: shape: {h.shape}")
        h = self.output_dense_12(h)
        timber.debug(green + f"l12 output_dense_12: h: shape: {h.shape}")
        h = self.output_activation_13(h)
        timber.debug(green + f"l13 output_activation_13: h: shape: {h.shape}")
        return h

Upvotes: 0

Mike
Mike

Reputation: 1539

1) In pytorch, we take input channels and output channels as an input. In your first layer, the input channels will be the number of color channels in your image. After that it's always going to be the same as the output channels from your previous layer (output channels are specified by the filters parameter in Tensorflow).

2). Pytorch is slightly annoying in the fact that when flattening your conv outputs you'll have to calculate the shape yourself. You can either use an equation to calculate this (𝑂𝑢𝑡=(𝑊−𝐹+2𝑃)/𝑆+1), or make a shape calculating function to get the shape of a dummy image after it's been passed through the conv part of the network. This parameter will be your size of input argument; the size of your output argument will just be the number of nodes you want in your next fully connected layer.

Upvotes: 2

Related Questions