刚接触AI,写了一些算法代码。 但是 PPO 有点复杂,我不知道我的代码有什么问题。 谁能帮忙?
以下是我用 colab 编写的代码:
块1
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import torch as T
from torch.nn import Sequential
from torch.nn import Linear
from torch.nn import Sigmoid
import torch.nn as nn
import gym
块2:
class SimpleModel(nn.Module):
def __init__(self, obs_space, action_space, is_actor):
super(SimpleModel, self).__init__()
if is_actor:
self.net = Sequential(
Linear(obs_space, 256),
Linear(256, 128),
Linear(128, action_space),
Sigmoid()
)
else:
self.net = Sequential(
Linear(obs_space, 256),
Linear(256, 128),
Linear(128, 1)
)
def forward(self, x):
out = self.net(x)
return out
class PPOAgent:
def __init__(self,
buffer_cap,
obs_space,
action_space,
gamma=0.99,
ld=0.9,
eps_clip=0.2,
lr=0.0001):
self.buffer_cap = buffer_cap
self.obs_space = obs_space
self.action_space = action_space
self.gamma = gamma
self.ld = ld # lambda value of GAE
self.eps_clip = eps_clip
self.c1 = 1
self.c2 = 1
self.reset_buffer()
# AC networks
self.actor = SimpleModel(obs_space, action_space, is_actor=True)
self.critic = SimpleModel(obs_space, action_space, is_actor=False)
self.optimizer_actor = T.optim.Adam(self.actor.parameters(), lr=lr)
self.optimizer_critic = T.optim.Adam(self.critic.parameters(), lr=lr)
def reset_buffer(self):
# ReplayBuffer
self.states = np.zeros((self.buffer_cap, self.obs_space))
self.actions = np.zeros((self.buffer_cap)) # keep single action executed
self.next_states = np.zeros((self.buffer_cap, self.obs_space))
# 0 if done = True
self.dones = np.zeros((self.buffer_cap))
self.preds_past = np.zeros((self.buffer_cap, self.action_space)) # all output nodes in Tensor form
self.rewards = np.zeros((self.buffer_cap))
self.idx = 0
def push(self, state, action, next_state, done, pred_past, reward):
self.states[self.idx%self.buffer_cap] = state
self.actions[self.idx%self.buffer_cap] = action
self.next_states[self.idx%self.buffer_cap] = next_state
self.dones[self.idx%self.buffer_cap] = done
self.preds_past[self.idx%self.buffer_cap] = pred_past.detach().numpy()
self.rewards[self.idx%self.buffer_cap] = reward
self.idx += 1
def sample_batch(self):
#idx = np.random.randint(0, high=min(self.idx, self.buffer_cap), size=self.buffer_cap)
return T.tensor(self.states, dtype=T.float32), T.tensor(self.actions, dtype=T.int64), \
T.tensor(self.next_states, dtype=T.float32), self.dones, \
T.tensor(self.preds_past, dtype=T.float32), T.tensor(self.rewards, dtype=T.float32)
# 1. Clipped Surrogate Loss
# 2. GAE
# 3. Entropy
# 4. (Optional) Target Network
def gradient(self):
''' 1. Get Advantage with GAE '''
s, a, next_s, d, pd_past, r = self.sample_batch()
values = self.predict_state_value(s)
values_next = self.predict_state_value(next_s)
advantage = np.zeros(self.buffer_cap+1)
for t in reversed(range(self.buffer_cap)):
# Calculate delta_n
delta = (r[t] + self.gamma * values_next[t] * d[t] - values[t]).detach().numpy()
# Adv_t = delta_t + gm*ld*Adv_t+1
# last advantage is 0
advantage[t] = delta[0] + self.gamma * self.ld * advantage[t+1] * d[t]
advantage = T.tensor(advantage[:-1], dtype=T.float32)
# print('GAE calculated')
''' 2. Get L_clip '''
# Switch to tensor to get gradient
#states, actions, next_states, dones, preds_past, rewards = self.sample_batch()
proba_dist = T.gather(self.actor(s), 1, a.unsqueeze(axis=-1)).squeeze(axis=-1) # get 'action' th value
preds_past = pd_past[range(len(pd_past)), a.numpy()]
ratio = T.div(proba_dist , preds_past+T.tensor(np.full((self.buffer_cap), 1e-8)))
#print(ratio)
L_clip = T.mean(T.min(ratio*advantage, T.clamp(ratio, 1-self.eps_clip, 1+self.eps_clip)*advantage))
#print(L_clip)
# print('L_clip calculated')
''' 3. Get L_v '''
L_v = T.nn.functional.mse_loss(values, T.tensor(r, dtype=T.float32)+self.gamma*values_next)
# print('L_V calculated')
#print(L_v)
''' 4. Get Entropy & sum up everything '''
action_probs = self.actor(T.tensor(s, dtype=T.float32))
entropy = T.mean(T.distributions.Categorical(action_probs).entropy())
#print(entropy)
#print()
L_PPO = -T.mean(L_clip - self.c1*L_v + self.c2*entropy)
self.optimizer_actor.zero_grad()
self.optimizer_critic.zero_grad()
L_PPO.backward()
self.optimizer_actor.step()
self.optimizer_critic.step()
# self.reset_buffer()
# print('Finished episode training')
# Predict method
def predict_action(self, states):
predictions = self.actor(states)
m = T.distributions.Categorical(predictions)
actions = m.sample()#.numpy()
return predictions, actions
def predict_state_value(self, states):
values = self.critic(states)
#print(values)
return values
第三块:
env = gym.make('CartPole-v1')
obs = env.reset()
# print(obs)
agent = PPOAgent(2000, len(obs), action_space=2)
score = 0
score_hist = []
count = 0
for i in range(4000):
prediction, action = agent.predict_action(T.tensor(obs))
obs_next, reward, done, info = env.step(action.numpy())
score += reward
agent.push(obs, action, obs_next, done, prediction, reward)
obs = obs_next
if done:
count += 1
score_hist.append(score)
print(f"{count}th episode: score {score}")
score = 0
env.reset()
if count%10 == 9 and done:
agent.gradient()
print()
plt.plot(score_hist)
plt.show()
几天来我一直在尝试实现原始 PPO 论文中显示的损失:https://arxiv.org/pdf/1707.06347 Internet 中的示例显示 100 个 epoch 中的最高分数。 我看了很多例子,但无法弄清楚我的代码有什么问题:(