hmghaly
hmghaly

Reputation: 1502

Using RNN Trained Model without pytorch installed

I have trained an RNN model with pytorch. I need to use the model for prediction in an environment where I'm unable to install pytorch because of some strange dependency issue with glibc. However, I can install numpy and scipy and other libraries. So, I want to use the trained model, with the network definition, without pytorch.

I have the weights of the model as I save the model with its state dict and weights in the standard way, but I can also save it using just json/pickle files or similar.

I also have the network definition, which depends on pytorch in a number of ways. This is my RNN network definition.

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random

torch.manual_seed(1)
random.seed(1)
device = torch.device('cpu')

class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, output_size,num_layers, matching_in_out=False, batch_size=1):
    super(RNN, self).__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.num_layers = num_layers
    self.batch_size = batch_size
    self.matching_in_out = matching_in_out #length of input vector matches the length of output vector 
    self.lstm = nn.LSTM(input_size, hidden_size,num_layers)
    self.hidden2out = nn.Linear(hidden_size, output_size)
    self.hidden = self.init_hidden()
  def forward(self, feature_list):
    feature_list=torch.tensor(feature_list)
    
    if self.matching_in_out:
      lstm_out, _ = self.lstm( feature_list.view(len( feature_list), 1, -1))
      output_space = self.hidden2out(lstm_out.view(len( feature_list), -1))
      output_scores = torch.sigmoid(output_space) #we'll need to check if we need this sigmoid
      return output_scores #output_scores
    else:
      for i in range(len(feature_list)):
        cur_ft_tensor=feature_list[i]#.view([1,1,self.input_size])
        cur_ft_tensor=cur_ft_tensor.view([1,1,self.input_size])
        lstm_out, self.hidden = self.lstm(cur_ft_tensor, self.hidden)
        outs=self.hidden2out(lstm_out)
      return outs
  def init_hidden(self):
    #return torch.rand(self.num_layers, self.batch_size, self.hidden_size)
    return (torch.rand(self.num_layers, self.batch_size, self.hidden_size).to(device),
            torch.rand(self.num_layers, self.batch_size, self.hidden_size).to(device))

I am aware of this question, but I'm willing to go as low level as possible. I can work with numpy array instead of tensors, and reshape instead of view, and I don't need a device setting.

Based on the class definition above, what I can see here is that I only need the following components from torch to get an output from the forward function:

I think I can easily implement the sigmoid function using numpy. However, can I have some implementation for the nn.LSTM and nn.Linear using something not involving pytorch? Also, how will I use the weights from the state dict into the new class?

So, the question is, how can I "translate" this RNN definition into a class that doesn't need pytorch, and how to use the state dict weights for it? Alternatively, is there a "light" version of pytorch, that I can use just to run the model and yield a result?

EDIT

I think it might be useful to include the numpy/scipy equivalent for both nn.LSTM and nn.linear. It would help us compare the numpy output to torch output for the same code, and give us some modular code/functions to use. Specifically, a numpy equivalent for the following would be great:

rnn = nn.LSTM(10, 20, 2)
input = torch.randn(5, 3, 10)
h0 = torch.randn(2, 3, 20)
c0 = torch.randn(2, 3, 20)
output, (hn, cn) = rnn(input, (h0, c0))

and also for linear:

m = nn.Linear(20, 30)
input = torch.randn(128, 20)
output = m(input)

Upvotes: 3

Views: 1073

Answers (2)

ibadia
ibadia

Reputation: 919

Basically implementing it in numpy and copying weights from your pytorch model can do the trick. For your usecase you will only need to do a forward pass so we just need to implement that only

#Set Parameters for a small LSTM network
input_size  = 2 # size of one 'event', or sample, in our batch of data
hidden_dim  = 3 # 3 cells in the LSTM layer
output_size = 1 # desired model output

num_layers=3
torch_lstm = RNN( input_size, 
                 hidden_dim ,
                 output_size,
                 num_layers,
                 matching_in_out=True
                 )

state = torch_lstm.state_dict() # state will capture the weights of your model

Now for LSTM in numpy these functions will be used: got the below code from this link: https://towardsdatascience.com/the-lstm-reference-card-6163ca98ae87

### NOT MY CODE
import numpy as np 
from scipy.special import expit as sigmoid

def forget_gate(x, h, Weights_hf, Bias_hf, Weights_xf, Bias_xf, prev_cell_state):
    forget_hidden  = np.dot(Weights_hf, h) + Bias_hf
    forget_eventx  = np.dot(Weights_xf, x) + Bias_xf
    return np.multiply( sigmoid(forget_hidden + forget_eventx), prev_cell_state )

def input_gate(x, h, Weights_hi, Bias_hi, Weights_xi, Bias_xi, Weights_hl, Bias_hl, Weights_xl, Bias_xl):
    ignore_hidden  = np.dot(Weights_hi, h) + Bias_hi
    ignore_eventx  = np.dot(Weights_xi, x) + Bias_xi
    learn_hidden   = np.dot(Weights_hl, h) + Bias_hl
    learn_eventx   = np.dot(Weights_xl, x) + Bias_xl
    return np.multiply( sigmoid(ignore_eventx + ignore_hidden), np.tanh(learn_eventx + learn_hidden) )


def cell_state(forget_gate_output, input_gate_output):
    return forget_gate_output + input_gate_output

  
def output_gate(x, h, Weights_ho, Bias_ho, Weights_xo, Bias_xo, cell_state):
    out_hidden = np.dot(Weights_ho, h) + Bias_ho
    out_eventx = np.dot(Weights_xo, x) + Bias_xo
    return np.multiply( sigmoid(out_eventx + out_hidden), np.tanh(cell_state) )

We would need the sigmoid function as well so

def sigmoid(x):
    return 1/(1 + np.exp(-x))

Because pytorch stores weights in stacked manner so we need to break it up for that we would need the below function

def get_slices(hidden_dim):
    slices=[]
    breaker=(hidden_dim*4)
    slices=[[i,i+3] for i in range(0, breaker, breaker//4)]
    return slices

Now we have the functions ready for lstm, now we create an lstm class to copy the weights from pytorch class and get the output from it.

class numpy_lstm:
    def __init__( self, layer_num=0, hidden_dim=1, matching_in_out=False):
        self.matching_in_out=matching_in_out
        self.layer_num=layer_num
        self.hidden_dim=hidden_dim
        
    def init_weights_from_pytorch(self, state):
        slices=get_slices(self.hidden_dim)
        print (slices)

        #Event (x) Weights and Biases for all gates
        
        lstm_weight_ih='lstm.weight_ih_l'+str(self.layer_num)
        self.Weights_xi = state[lstm_weight_ih][slices[0][0]:slices[0][1]].numpy()  # shape  [h, x]
        self.Weights_xf = state[lstm_weight_ih][slices[1][0]:slices[1][1]].numpy()  # shape  [h, x]
        self.Weights_xl = state[lstm_weight_ih][slices[2][0]:slices[2][1]].numpy()  # shape  [h, x]
        self.Weights_xo = state[lstm_weight_ih][slices[3][0]:slices[3][1]].numpy() # shape  [h, x]

        
        lstm_bias_ih='lstm.bias_ih_l'+str(self.layer_num)
        self.Bias_xi = state[lstm_bias_ih][slices[0][0]:slices[0][1]].numpy()  #shape is [h, 1]
        self.Bias_xf = state[lstm_bias_ih][slices[1][0]:slices[1][1]].numpy()  #shape is [h, 1]
        self.Bias_xl = state[lstm_bias_ih][slices[2][0]:slices[2][1]].numpy()  #shape is [h, 1]
        self.Bias_xo = state[lstm_bias_ih][slices[3][0]:slices[3][1]].numpy() #shape is [h, 1]
        
        
        lstm_weight_hh='lstm.weight_hh_l'+str(self.layer_num)

        #Hidden state (h) Weights and Biases for all gates
        self.Weights_hi = state[lstm_weight_hh][slices[0][0]:slices[0][1]].numpy()  #shape is [h, h]
        self.Weights_hf = state[lstm_weight_hh][slices[1][0]:slices[1][1]].numpy()  #shape is [h, h]
        self.Weights_hl = state[lstm_weight_hh][slices[2][0]:slices[2][1]].numpy()  #shape is [h, h]
        self.Weights_ho = state[lstm_weight_hh][slices[3][0]:slices[3][1]].numpy() #shape is [h, h]
        
        
        lstm_bias_hh='lstm.bias_hh_l'+str(self.layer_num)

        self.Bias_hi = state[lstm_bias_hh][slices[0][0]:slices[0][1]].numpy()  #shape is [h, 1]
        self.Bias_hf = state[lstm_bias_hh][slices[1][0]:slices[1][1]].numpy()  #shape is [h, 1]
        self.Bias_hl = state[lstm_bias_hh][slices[2][0]:slices[2][1]].numpy()  #shape is [h, 1]
        self.Bias_ho = state[lstm_bias_hh][slices[3][0]:slices[3][1]].numpy() #shape is [h, 1]
    def forward_lstm_pass(self,input_data):
        h = np.zeros(self.hidden_dim)
        c = np.zeros(self.hidden_dim)
        
        output_list=[]
        for eventx in input_data:
            f = forget_gate(eventx, h, self.Weights_hf, self.Bias_hf, self.Weights_xf, self.Bias_xf, c)
            i =  input_gate(eventx, h, self.Weights_hi, self.Bias_hi, self.Weights_xi, self.Bias_xi, 
                        self.Weights_hl, self.Bias_hl, self.Weights_xl, self.Bias_xl)
            c = cell_state(f,i)
            h = output_gate(eventx, h, self.Weights_ho, self.Bias_ho, self.Weights_xo, self.Bias_xo, c)
            if self.matching_in_out: # doesnt make sense but it was as it was in main code :(
                output_list.append(h)
        if self.matching_in_out:
            return output_list
        else:
            return h

Similarly for fully connected layer,

    
    
class fully_connected_layer:
    def __init__(self,state, dict_name='fc', ):
        self.fc_Weight = state[dict_name+'.weight'][0].numpy()
        self.fc_Bias = state[dict_name+'.bias'][0].numpy() #shape is [,output_size]
        
    def forward(self,lstm_output, is_sigmoid=True):
        res=np.dot(self.fc_Weight, lstm_output)+self.fc_Bias
        print (res)
        if is_sigmoid:
            return sigmoid(res)
        else:
            return res
        

Now we would need one class to call all of them together and generalise them with respect to multiple layers You can modify the below class if you need more Fully connected layers or want to set false condition for sigmoid etc.

        
class RNN_model_Numpy:
    def __init__(self, state, input_size, hidden_dim, output_size, num_layers, matching_in_out=True):
        self.lstm_layers=[]
        for i in range(0, num_layers):
            lstm_layer_obj=numpy_lstm(layer_num=i, hidden_dim=hidden_dim, matching_in_out=True)
            lstm_layer_obj.init_weights_from_pytorch(state) 
            self.lstm_layers.append(lstm_layer_obj)
        
        self.hidden2out=fully_connected_layer(state, dict_name='hidden2out')
        
    def forward(self, feature_list):
        for x in self.lstm_layers:
            lstm_output=x.forward_lstm_pass(feature_list)
            feature_list=lstm_output
            
        return self.hidden2out.forward(feature_list, is_sigmoid=False)

Sanity check on a numpy variable:

data = np.array(
           [[1,1],
            [2,2],
            [3,3]])



check=RNN_model_Numpy(state, input_size, hidden_dim, output_size, num_layers)
check.forward(data)

EXPLANATION: Since we just need forward pass, we would need certain functions that are required in LSTM, for that we have the forget gate, input gate, cell gate and output gate. They are just some operations that are done on the input that you give.

For get_slices function, this is used to break down the weight matrix that we get from pytorch state dictionary (state dictionary) is the dictionary which contains the weights of all the layers that we have in our network. For LSTM particularly have it in this order ignore, forget, learn, output. So for that we would need to break it up for different LSTM cells.

For numpy_lstm class, we have init_weights_from_pytorch function which must be called, what it will do is that it will extract the weights from state dictionary which we got earlier from pytorch model object and then populate the numpy array weights with the pytorch weights. You can first train your model and then save the state dictionary through pickle and then use it.

The fully connected layer class just implements the hidden2out neural network.

Finally our rnn_model_numpy class is there to ensure that if you have multiple layers then it is able to send the output of one layer of lstm to other layer of lstm.

Lastly there is a small sanity check on data variable.

IMPORTANT NOTE: PLEASE NOTE THAT YOU MIGHT GET DIMENSION ERROR AS PYTORCH WAY OF HANDLING INPUT IS COMPLETELY DIFFERENT SO PLEASE ENSURE THAT YOU INPUT NUMPY IS OF SIMILAR SHAPE AS DATA VARIABLE.

Important references: https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html

https://christinakouridi.blog/2019/06/19/backpropagation-lstm/

Upvotes: 1

Bob
Bob

Reputation: 14654

You should try to export the model using torch.onnx. The page gives you an example that you can start with.

An alternative is to use TorchScript, but that requires torch libraries.

Both of these can be run without python. You can load torchscript in a C++ application https://pytorch.org/tutorials/advanced/cpp_export.html

ONNX is much more portable and you can use in languages such as C#, Java, or Javascript https://onnxruntime.ai/ (even on the browser)

A running example

Just modifying a little your example to go over the errors I found

Notice that via tracing any if/elif/else, for, while will be unrolled

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random

torch.manual_seed(1)
random.seed(1)
device = torch.device('cpu')

class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, output_size,num_layers, matching_in_out=False, batch_size=1):
    super(RNN, self).__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.num_layers = num_layers
    self.batch_size = batch_size
    self.matching_in_out = matching_in_out #length of input vector matches the length of output vector 
    self.lstm = nn.LSTM(input_size, hidden_size,num_layers)
    self.hidden2out = nn.Linear(hidden_size, output_size)
  def forward(self, x, h0, c0):
    lstm_out, (hidden_a, hidden_b) = self.lstm(x, (h0, c0))
    outs=self.hidden2out(lstm_out)
    return outs, (hidden_a, hidden_b)
  def init_hidden(self):
    #return torch.rand(self.num_layers, self.batch_size, self.hidden_size)
    return (torch.rand(self.num_layers, self.batch_size, self.hidden_size).to(device).detach(),
            torch.rand(self.num_layers, self.batch_size, self.hidden_size).to(device).detach())

# convert the arguments passed during onnx.export call
class MWrapper(nn.Module):
    def __init__(self, model):
        super(MWrapper, self).__init__()
        self.model = model;
    def forward(self, kwargs):
        return self.model(**kwargs)

Run an example

rnn = RNN(10, 10, 10, 3)
X = torch.randn(3,1,10)
h0,c0  = rnn.init_hidden()
print(rnn(X, h0, c0)[0])

Use the same input to trace the model and export an onnx file


torch.onnx.export(MWrapper(rnn), {'x':X,'h0':h0,'c0':c0}, 'rnn.onnx', 
                  dynamic_axes={'x':{1:'N'},
                               'c0':{1: 'N'},
                               'h0':{1: 'N'}
                               },
                  input_names=['x', 'h0', 'c0'],
                  output_names=['y', 'hn', 'cn']
                 )

Notice that you can use symbolic values for the dimensions of some axes of some inputs. Unspecified dimensions will be fixed with the values from the traced inputs. By default LSTM uses dimension 1 as batch.

Next we load the ONNX model and pass the same inputs

import onnxruntime
ort_model = onnxruntime.InferenceSession('rnn.onnx')
print(ort_model.run(['y'], {'x':X.numpy(), 'c0':c0.numpy(), 'h0':h0.numpy()}))

Upvotes: 4

Related Questions