我的 PPO 算法代码不工作,需要修复

问题描述 投票:0回答:0

刚接触AI,写了一些算法代码。 但是 PPO 有点复杂,我不知道我的代码有什么问题。 谁能帮忙?

以下是我用 colab 编写的代码:

块1

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import torch as T
from torch.nn import Sequential
from torch.nn import Linear
from torch.nn import Sigmoid
import torch.nn as nn

import gym

块2:

class SimpleModel(nn.Module):
    def __init__(self, obs_space, action_space, is_actor):
        super(SimpleModel, self).__init__()
        if is_actor:
            self.net = Sequential(
                Linear(obs_space, 256),
                Linear(256, 128),
                Linear(128, action_space),
                Sigmoid()
                )
        else:
            self.net = Sequential(
                Linear(obs_space, 256),
                Linear(256, 128),
                Linear(128, 1)
                )
            
    def forward(self, x):
        out = self.net(x)
        return out

class PPOAgent:
    def __init__(self,
                 buffer_cap,
                 obs_space,
                 action_space,
                 gamma=0.99,
                 ld=0.9,
                 eps_clip=0.2,
                 lr=0.0001):
        
        self.buffer_cap = buffer_cap
        self.obs_space = obs_space
        self.action_space = action_space
        self.gamma = gamma
        self.ld = ld # lambda value of GAE
        self.eps_clip = eps_clip
        self.c1 = 1
        self.c2 = 1
        
        self.reset_buffer()

        # AC networks
        self.actor  = SimpleModel(obs_space, action_space, is_actor=True)
        self.critic = SimpleModel(obs_space, action_space, is_actor=False)

        self.optimizer_actor  = T.optim.Adam(self.actor.parameters(), lr=lr)
        self.optimizer_critic = T.optim.Adam(self.critic.parameters(), lr=lr)

    def reset_buffer(self):
        # ReplayBuffer
        self.states      = np.zeros((self.buffer_cap, self.obs_space))
        self.actions     = np.zeros((self.buffer_cap)) # keep single action executed
        self.next_states = np.zeros((self.buffer_cap, self.obs_space))
        # 0 if done = True
        self.dones       = np.zeros((self.buffer_cap))
        self.preds_past  = np.zeros((self.buffer_cap, self.action_space)) # all output nodes in Tensor form
        self.rewards     = np.zeros((self.buffer_cap))

        self.idx = 0

    def push(self, state, action, next_state, done, pred_past, reward):
        self.states[self.idx%self.buffer_cap] = state
        self.actions[self.idx%self.buffer_cap] = action
        self.next_states[self.idx%self.buffer_cap] = next_state
        self.dones[self.idx%self.buffer_cap] = done
        self.preds_past[self.idx%self.buffer_cap] = pred_past.detach().numpy()
        self.rewards[self.idx%self.buffer_cap] = reward
        self.idx += 1

    def sample_batch(self):
        #idx = np.random.randint(0, high=min(self.idx, self.buffer_cap), size=self.buffer_cap)
        return T.tensor(self.states, dtype=T.float32), T.tensor(self.actions, dtype=T.int64), \
            T.tensor(self.next_states, dtype=T.float32), self.dones, \
            T.tensor(self.preds_past, dtype=T.float32), T.tensor(self.rewards, dtype=T.float32)
        
    # 1. Clipped Surrogate Loss
    # 2. GAE
    # 3. Entropy
    # 4. (Optional) Target Network
    def gradient(self):

        ''' 1. Get Advantage with GAE '''

        s, a, next_s, d, pd_past, r = self.sample_batch()

        values = self.predict_state_value(s)
        values_next = self.predict_state_value(next_s)
        advantage = np.zeros(self.buffer_cap+1)

        for t in reversed(range(self.buffer_cap)):
            # Calculate delta_n
            delta = (r[t] + self.gamma * values_next[t] * d[t] - values[t]).detach().numpy()
            # Adv_t = delta_t + gm*ld*Adv_t+1
            # last advantage is 0
            advantage[t] = delta[0] + self.gamma * self.ld * advantage[t+1] * d[t]
        
        advantage = T.tensor(advantage[:-1], dtype=T.float32)
        # print('GAE calculated')

        ''' 2. Get L_clip '''
        # Switch to tensor to get gradient
        #states, actions, next_states, dones, preds_past, rewards = self.sample_batch()

        proba_dist = T.gather(self.actor(s), 1, a.unsqueeze(axis=-1)).squeeze(axis=-1) # get 'action' th value
        preds_past = pd_past[range(len(pd_past)), a.numpy()]
        
        ratio = T.div(proba_dist , preds_past+T.tensor(np.full((self.buffer_cap), 1e-8)))
        #print(ratio)
        L_clip = T.mean(T.min(ratio*advantage, T.clamp(ratio, 1-self.eps_clip, 1+self.eps_clip)*advantage))
        #print(L_clip)
        # print('L_clip calculated')

        ''' 3. Get L_v '''
        L_v = T.nn.functional.mse_loss(values, T.tensor(r, dtype=T.float32)+self.gamma*values_next)
        # print('L_V calculated')
        #print(L_v)

        ''' 4. Get Entropy & sum up everything '''
        action_probs = self.actor(T.tensor(s, dtype=T.float32))
        entropy = T.mean(T.distributions.Categorical(action_probs).entropy())
        #print(entropy)
        #print()
        L_PPO = -T.mean(L_clip - self.c1*L_v + self.c2*entropy)

        self.optimizer_actor.zero_grad()
        self.optimizer_critic.zero_grad()
        L_PPO.backward()
        self.optimizer_actor.step()
        self.optimizer_critic.step()

        # self.reset_buffer()
        # print('Finished episode training')

    # Predict method
    def predict_action(self, states):
        predictions = self.actor(states)
        m = T.distributions.Categorical(predictions)
        actions = m.sample()#.numpy()
        return predictions, actions

    def predict_state_value(self, states):
        values = self.critic(states)
        #print(values)
        return values
        

第三块:

env = gym.make('CartPole-v1')
obs = env.reset()
# print(obs)

agent = PPOAgent(2000, len(obs), action_space=2)
score = 0
score_hist = []
count = 0
for i in range(4000):
    prediction, action = agent.predict_action(T.tensor(obs))
    obs_next, reward, done, info = env.step(action.numpy())
    score += reward
    agent.push(obs, action, obs_next, done, prediction, reward)
    obs = obs_next

    if done:
        count += 1
        score_hist.append(score)
        print(f"{count}th episode: score {score}")
        score = 0
        env.reset()

    if count%10 == 9 and done:
        agent.gradient()
        print()

plt.plot(score_hist)
plt.show()

输出: output.png

几天来我一直在尝试实现原始 PPO 论文中显示的损失:https://arxiv.org/pdf/1707.06347 Internet 中的示例显示 100 个 epoch 中的最高分数。 我看了很多例子,但无法弄清楚我的代码有什么问题:(

python-3.x pytorch reinforcement-learning
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.