为什么训练和推理的结果不同?

问题描述 投票:0回答:1

我正在使用双 DQN 算法训练强化学习。

目标是将充满零的二维板变成一。

为了训练成功,模型需要连续找到目标约 10 次。

然后保存并加载训练好的模型以运行推理过程。

然而,在推理过程中,模型惨败。

我跑了100集,一次都没成功。

有解决办法吗?

下面是代码。

import gymnasium as gym
import math
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import os

from collections import namedtuple, deque

os.environ['KMP_DUPLICATE_LIB_OK']='True'

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        self.memory.append(*args)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

class DQN(nn.Module):
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 512)
        self.layer2 = nn.Linear(512, 512)
        self.layer3 = nn.Linear(512, 256)
        self.layer4 = nn.Linear(256, n_actions)

    def forward(self, inputs):
        x = F.relu(self.layer1(inputs))
        x = F.relu(self.layer2(x))
        x = F.relu(self.layer3(x))
        return self.layer4(x)

class CustomEnv(gym.Env):   
    def __init__(self, max_x, max_y):
        self.max_x = max_x
        self.max_y = max_y
    
    def reset(self):
        self.board = np.full((self.max_y, self.max_x), 0)        
        self.n_same = 0

        return self.board

    def step(self, action):
        reward = 0
        done = False
        
        tile_type = action % 2
        temp = action // 2

        y = temp // self.max_x
        x = temp % self.max_x

        if self.board[y][x] != tile_type: 
            self.board[y][x] = tile_type 

            if tile_type == 1 :
                self.n_same += 1
                reward = 1

                if self.n_same == self.max_y * self.max_x:
                    reward = 100
                    done = True
            else : 
                self.n_same -= 1
                reward = -1

        return self.board, reward, done
    
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4
SEED = 42

np.random.seed(SEED)

env = CustomEnv(9, 7)

find_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
state = env.reset()
n_observations = env.max_x * env.max_y
action_space = gym.spaces.Discrete(n_observations * 2)

policy_net = DQN(n_observations, action_space.n).to(find_device)
target_net = DQN(n_observations, action_space.n).to(find_device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)

steps_done = 0

def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[action_space.sample()]], device=find_device, dtype=torch.long)

episode_durations = []

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=find_device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    next_state_values = torch.zeros(BATCH_SIZE, device=find_device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

def main() :
    running_Reward = 0

    for e in range(1000):
        e_reward = 0
        state = env.reset()
        tensor_state = torch.tensor(state.flatten(), dtype=torch.float32, device=find_device).unsqueeze(0)
        s = 0

        while True:
            action = select_action(tensor_state)
            observation, reward, done = env.step(action.item())
            reward = torch.tensor([reward], device=find_device)
            e_reward += reward.item()

            if done: 
                next_state = None

            else:
                next_state = torch.tensor(observation.flatten(), dtype=torch.float32, device=find_device).unsqueeze(0)

            memory.push(Transition(tensor_state, action, next_state, torch.tensor([reward], device=find_device)))
            tensor_state = next_state
            optimize_model()

            target_net_state_dict = target_net.state_dict()
            policy_net_state_dict = policy_net.state_dict()
            
            for key in policy_net_state_dict:
                target_net_state_dict[key] = policy_net_state_dict[key] * 0.005 + target_net_state_dict[key] * (1 - 0.005)
            target_net.load_state_dict(target_net_state_dict)

            s += 1
            if done or s >= 500: 
                running_Reward = running_Reward * (1 - 0.05) + float(e_reward) * 0.05   
                break

        if running_Reward > 155:
            print(f"Solved at episode {len(e_rewards)}!")
            break

        print(f"Reward for {e}th episode: {e_reward}")
    
    torch.save(policy_net.state_dict(), './testModel.pth')

def inference():
    policy_net.load_state_dict(torch.load('./testModel.pth', map_location=find_device))
    policy_net.eval()

    with torch.no_grad():
        for e in range(100) :
            e_reward = 0
            state = env.reset()
            
            for _ in range(500) :
                tensor_state = torch.tensor(state.flatten(), dtype=torch.float32, device=find_device).unsqueeze(0)
                action = select_action(tensor_state)
                state, r, done = env.step(action.item())
                e_reward += r

                if done :
                    break
                
            print(f"Reward for {e}th episode: {e_reward}")


Train = False

if Train:
    main()

else :
    inference()

以下是模型在训练结束时每集中返回的奖励总和。

值为 162 意味着模型在该集中找到了正确答案。

...
Reward for 438th episode: 162
Reward for 439th episode: 162
Reward for 440th episode: 162
Reward for 441th episode: 61
Reward for 442th episode: 162
Reward for 443th episode: 162
Reward for 444th episode: 162
Reward for 445th episode: 162
Reward for 446th episode: 162
Reward for 447th episode: 162
Reward for 448th episode: 162
Reward for 449th episode: 162
Reward for 450th episode: 162
Reward for 451th episode: 162
Reward for 452th episode: 162
Solved at episode 453!

我需要帮助。 欢迎所有评论。 谢谢。

python tensorflow reinforcement-learning
1个回答
0
投票

假设您对我的评论的回答是肯定的,我想我知道问题所在。
如果您使用

Train=False
启动脚本,则变量
steps_done
也会设置为
0
。在
inference()
,你打电话
action = select_action(tensor_state)

然后由于
eps_threshold
计算出较高的
steps_done = 0
。这很可能会导致随机动作,而不是受过训练的
policy_net
会选择的动作。训练结束时,模型可能已经执行了 1000 步,因此
eps_threshold
很低,从
policy_net
采取行动的概率很高。
我认为你应该在推理时禁用随机操作调用。例如,在
select_action(state)
更改

if sample > eps_threshold:

if sample > eps_threshold or not Train:

在推理时始终从

policy_net
获取操作。

© www.soinside.com 2019 - 2024. All rights reserved.