Reputation: 45
I am training reinforcement learning using the double DQN algorithm.
The goal is to turn a two-dimensional board filled with zeros into ones.
For training to be successful, the model needs to find the target about 10 times in a row.
Then saved and loaded the trained model to run the inference process.
However, during the inference process, the model failed miserably.
I ran 100 episodes and it didn't succeed even once.
Is there a solution?
The code below changed the ENV and model structure of the code in the Pytorch tutorial.
tutorial link : https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
import gymnasium as gym
import math
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import os
from collections import namedtuple, deque
os.environ['KMP_DUPLICATE_LIB_OK']='True'
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity)
def push(self, *args):
self.memory.append(*args)
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
class DQN(nn.Module):
def __init__(self, n_observations, n_actions):
super(DQN, self).__init__()
self.layer1 = nn.Linear(n_observations, 512)
self.layer2 = nn.Linear(512, 512)
self.layer3 = nn.Linear(512, 256)
self.layer4 = nn.Linear(256, n_actions)
def forward(self, inputs):
x = F.relu(self.layer1(inputs))
x = F.relu(self.layer2(x))
x = F.relu(self.layer3(x))
return self.layer4(x)
class CustomEnv(gym.Env):
def __init__(self, max_x, max_y):
self.max_x = max_x
self.max_y = max_y
def reset(self):
self.board = np.full((self.max_y, self.max_x), 0)
self.n_same = 0
return self.board
def step(self, action):
reward = 0
done = False
tile_type = action % 2
temp = action // 2
y = temp // self.max_x
x = temp % self.max_x
if self.board[y][x] != tile_type:
self.board[y][x] = tile_type
if tile_type == 1 :
self.n_same += 1
reward = 1
if self.n_same == self.max_y * self.max_x:
reward = 100
done = True
else :
self.n_same -= 1
reward = -1
return self.board, reward, done
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4
SEED = 42
np.random.seed(SEED)
env = CustomEnv(9, 7)
find_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
state = env.reset()
n_observations = env.max_x * env.max_y
action_space = gym.spaces.Discrete(n_observations * 2)
policy_net = DQN(n_observations, action_space.n).to(find_device)
target_net = DQN(n_observations, action_space.n).to(find_device)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)
steps_done = 0
def select_action(state):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
with torch.no_grad():
return policy_net(state).max(1)[1].view(1, 1)
else:
return torch.tensor([[action_space.sample()]], device=find_device, dtype=torch.long)
episode_durations = []
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
batch = Transition(*zip(*transitions))
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=find_device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
state_action_values = policy_net(state_batch).gather(1, action_batch)
next_state_values = torch.zeros(BATCH_SIZE, device=find_device)
with torch.no_grad():
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
criterion = nn.SmoothL1Loss()
loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
optimizer.step()
def main() :
running_Reward = 0
for e in range(1000):
e_reward = 0
state = env.reset()
tensor_state = torch.tensor(state.flatten(), dtype=torch.float32, device=find_device).unsqueeze(0)
s = 0
while True:
action = select_action(tensor_state)
observation, reward, done = env.step(action.item())
reward = torch.tensor([reward], device=find_device)
e_reward += reward.item()
if done:
next_state = None
else:
next_state = torch.tensor(observation.flatten(), dtype=torch.float32, device=find_device).unsqueeze(0)
memory.push(Transition(tensor_state, action, next_state, torch.tensor([reward], device=find_device)))
tensor_state = next_state
optimize_model()
target_net_state_dict = target_net.state_dict()
policy_net_state_dict = policy_net.state_dict()
for key in policy_net_state_dict:
target_net_state_dict[key] = policy_net_state_dict[key] * 0.005 + target_net_state_dict[key] * (1 - 0.005)
target_net.load_state_dict(target_net_state_dict)
s += 1
if done or s >= 500:
running_Reward = running_Reward * (1 - 0.05) + float(e_reward) * 0.05
break
if running_Reward > 155:
print(f"Solved at episode {len(e_rewards)}!")
break
print(f"Reward for {e}th episode: {e_reward}")
torch.save(policy_net.state_dict(), './testModel.pth')
def inference():
policy_net.load_state_dict(torch.load('./testModel.pth', map_location=find_device))
policy_net.eval()
with torch.no_grad():
for e in range(100) :
e_reward = 0
state = env.reset()
for _ in range(500) :
tensor_state = torch.tensor(state.flatten(), dtype=torch.float32, device=find_device).unsqueeze(0)
action = policy_net(tensor_state).max(1)[1].view(1, 1)
state, r, done = env.step(action.item())
e_reward += r
if done :
break
print(f"Reward for {e}th inference episode: {e_reward}")
Train = False
if Train:
main()
else :
inference()
Below is the sum of the rewards returned by the model in each episode towards the end of training.
A value of 162 means that the model found the correct answer in that episode.
Reward for 0th episode: 35
Reward for 1th episode: 38
Reward for 2th episode: 53
Reward for 3th episode: 50
Reward for 4th episode: 54
Reward for 5th episode: 54
Reward for 6th episode: 54
...
Reward for 443th episode: 162
Reward for 444th episode: 162
Reward for 445th episode: 162
Reward for 446th episode: 162
Reward for 447th episode: 162
Reward for 448th episode: 162
Reward for 449th episode: 162
Reward for 450th episode: 162
Reward for 451th episode: 162
Reward for 452th episode: 162
Solved at episode 453!
but result of inference
Reward for 0th episode: 6
Reward for 1th episode: 6
Reward for 2th episode: 6
Reward for 3th episode: 6
Reward for 4th episode: 6
Reward for 5th episode: 6
Reward for 6th episode: 6
Reward for 7th episode: 6
Reward for 8th episode: 6
Reward for 9th episode: 6
Reward for 10th episode: 6
...
I need help. All comments are welcome. thx.
Upvotes: 0
Views: 189
Reputation: 1833
Assuming your answer to my comment is yes, I think I know the problem.
If you start your script with Train=False
, the variable steps_done
is also set to 0
. In inference()
, you call
action = select_action(tensor_state)
which then calculates a high eps_threshold
because of steps_done = 0
. This leads most likely to a random action, and not one the trained policy_net
would choose. At the end of training, the model has probably already taken 1000s of steps, so the eps_threshold
is low and the probability to take an action from the policy_net
is high.
I think you should disable the random action call at inference time. For example, in select_action(state)
change
if sample > eps_threshold:
to
if sample > eps_threshold or not Train:
to always get the action from the policy_net
at inference.
Upvotes: 0