Reputation: 45
I have three inputs to my LSTM (x,y,z). My LSTM model is used to predict the next time step of z. I have a lookback period of 9 timesteps. I then need to forecast the next time steps of z using a recursive function. However, I get bad results when I plot my forecasted z values. Data comes from a csv file that I cannot share.
This is my code:
data = np.column_stack((x,y, z))
def df_to_X_y(data, window_size=9):
X = [ ]
y = [ ]
for i in range(len(data) - window_size):
row = data[i:i + window_size]
X.append(row)
label = data[i + window_size, 0]
y.append(label)
return np.array(X), np.array(y)
X, y = df_to_X_y(data)
split_ratio = 0.8
split_idx = int(len(X) * split_ratio)
X_train = X[:split_idx]
X_test = X[split_idx:]
y_train = y[:split_idx]
y_test = y[split_idx:]
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, target_size):
super(LSTMModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, target_size) # Output layer for regression
def forward(self, x):
lstm_out, _ = self.lstm(x)
# lstm_out, (hn, cn) = self.lstm(x)
out = self.fc(lstm_out[:, -1, :])
return out
def forecast(self, initial_input, num_steps):
predictions = [] # Store forecasted TMP values
current_input = initial_input.clone() # Clone to avoid modifying the original input
for _ in range(num_steps):
next_output = self.forward(current_input)
predictions.append(next_output.unsqueeze(1)) # Add time dimension
next_input = current_input.clone() # Clone the current input
next_input[:, :-1, :] = current_input[:, 1:, :] # Shift the window by 1 step
next_input[:, -1, 0] = next_output.squeeze(1) # Update z with the prediction
# Leave x and y unchanged in next_input[:, -1, 1:] (automatically retained)
current_input = next_input # Update for the next iteration
return torch.cat(predictions, dim=1) # Concatenate predictions along the time dimension
input_size = 3
hidden_size = 50
num_layers = 3
learning_rate = 0.04
num_epochs = 50
target_size = 1
model = LSTMModel(input_size, hidden_size, num_layers, target_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
train_loss_values = []
test_loss_values = []
for epoch in range(num_epochs):
model.train()
optimizer.zero_grad()
y_pred = model(X_train_tensor)
train_loss = criterion(y_pred.squeeze(), y_train_tensor)
train_loss.backward()
optimizer.step()
train_loss_values.append(train_loss.item())
model.eval()
with torch.no_grad():
y_test_pred = model(X_test_tensor)
test_loss = criterion(y_test_pred.squeeze(), y_test_tensor)
test_loss_values.append(test_loss.item())
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss.item():.4f}, Test Loss: {test_loss.item():.4f}')
# Number of steps to forecast
num_steps = 9
# Use the last sequence from the test set as the initial input
initial_input = X_test_tensor[-1].unsqueeze(0) # Shape: (1, seq_length, input_size)
# Perform multi-step forecasting
model.eval()
with torch.no_grad():
forecasted_values = model.forecast(initial_input, num_steps)
Upvotes: 0
Views: 25
Reputation: 5473
Your forecast
code doesn't use the hidden state from the previous time step, so the model isn't able to use any sequence information. You need to design your model to allow you to pass a hidden state to the forward
method and use that hidden state during inference. Something like this:
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, target_size):
super(LSTMModel, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_size
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, target_size) # Output layer for regression
def forward(self, x, hidden=None):
if hidden is None:
hidden = self.get_hidden(x)
lstm_out, hidden = self.lstm(x, hidden)
out = self.fc(lstm_out[:, -1, :])
return out, hidden
def get_hidden(self, x):
# hidden state is of shape (n_layers, batch_size, d_hidden)
# batch size is `x.shape[0]` for `batch_first=True`
hidden = (
torch.zeros(self.num_layers, x.shape[0], self.hidden_size, device=x.device),
torch.zeros(self.num_layers, x.shape[0], self.hidden_size, device=x.device),
)
return hidden
def forecast(self, initial_input, num_steps):
predictions = [] # Store forecasted TMP values
current_input = initial_input.clone() # Clone to avoid modifying the original input
hidden = None # initial hidden state
with torch.no_grad():
for _ in range(num_steps):
# use and update hidden state for forward pass
next_output, hidden = self.forward(current_input, hidden)
current_input = next_output.unsqueeze(1)
predictions.append(current_input)
return torch.cat(predictions, dim=1) # Concatenate predictions along the time dimension
Upvotes: 1