“运行‘dqn.test’时没有执行,也没有错误消息”

问题描述 投票:0回答:1

运行代码scores = dqn.test(env, nb_episodes=100, Visualize=False)时,我遇到一个问题,执行时间很长,但没有产生任何输出或错误消息。该代码旨在测试 dqn 代理在 env 环境上 100 集的性能,而不需要可视化游戏玩法。

尽管等待了相当长的时间,但没有明显的进展或发生任何错误的迹象。代码似乎被卡住或花费过多时间才能完成。

对于可能导致此问题的原因以及如何解决该问题的任何见解或建议,我将不胜感激。

import gym
from gym import spaces
import numpy as np
import random

class EnergyOptimizationEnv(gym.Env):
    def __init__(self):
        # Définir les actions possibles : éteindre ou ne rien faire
        self.action_space = spaces.Discrete(2)
        
        # Définir l'espace d'observation : consommation énergétique
        self.observation_space = spaces.Box(low=0, high=10000, shape=(9,), dtype=np.float32)
        
        # Définir les seuils de consommation pour chaque appareil
        self.energy_ranges = {
            'lamp': (15, 100),
            'tv': (200, 400),
            'fridge': (1000, 2000)
        }
        
        # Définir le seuil critique de consommation énergétique
        self.energy_threshold = 10000
        
        # Initialiser l'état (consommation énergétique de chaque appareil)
        self.state = np.zeros(9)
        
        # Initialiser les compteurs d'appareils allumés
        self.num_lamps_on = 0
        self.num_tvs_on = 0
        self.num_fridges_on = 0
    
    def step(self, action):
        # Mettre à jour l'état en fonction de l'action choisie
        self.update_state(action)
        
        # Calculer la récompense en fonction de l'état actuel
        reward = self.calculate_reward()
        
        # Vérifier si le seuil critique de consommation est dépassé
        done = self.check_threshold()
        
        # Retourner l'état, la récompense, le booléen "done" et des informations supplémentaires
        info = {
            'num_lamps_on': self.num_lamps_on,
            'lamp_energy_consumption': self.state[0],
            'num_tvs_on': self.num_tvs_on,
            'tv_energy_consumption': self.state[1],
            'num_fridges_on': self.num_fridges_on,
            'fridge_energy_consumption': self.state[2]
        }
        return self.state, reward, done, info
    
    def update_state(self, action):
        # Mettre à jour l'état en fonction de l'action choisie
        if action == 1:  # Allumer un appareil aléatoire
            device = random.choice(list(self.energy_ranges.keys()))
            energy_range = self.energy_ranges[device]
            energy_consumption = random.uniform(energy_range[0], energy_range[1])
            if device == 'lamp':
                self.num_lamps_on += 1
                self.state[0] += energy_consumption
                self.state[3] = self.num_lamps_on
                self.state[4] = self.state[0]
            elif device == 'tv':
                self.num_tvs_on += 1
                self.state[1] += energy_consumption
                self.state[5] = self.num_tvs_on
                self.state[6] = self.state[1]
            elif device == 'fridge':
                self.num_fridges_on += 1
                self.state[2] += energy_consumption
                self.state[7] = self.num_fridges_on
                self.state[8] = self.state[2]
        else:  # Ne rien faire
            self.state[3] = self.num_lamps_on
            self.state[4] = self.state[0]
            self.state[5] = self.num_tvs_on
            self.state[6] = self.state[1]
            self.state[7] = self.num_fridges_on
            self.state[8] = self.state[2]
    
    def calculate_reward(self):
        # Calculer la récompense en fonction de l'état actuel
        if np.sum(self.state) <= self.energy_threshold:
            reward = 1
        else:
            reward = -1
        return reward
    
    def check_threshold(self):
        # Vérifier si le seuil critique de consommation est dépassé
        return np.sum(self.state) > self.energy_threshold
    
    def reset(self):
        # Réinitialiser l'état (consommation énergétique de chaque appareil)
        self.state = np.zeros(9)
        
        # Réinitialiser les compteurs d'appareils allumés
        self.num_lamps_on = 0
        self.num_tvs_on = 0
        self.num_fridges_on = 0
        
        # Retourner l'état initial
        return self.state

import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers.legacy import Adam
from tensorflow.keras.layers import Flatten
from rl.agents import DQNAgent
from rl.policy import GreedyQPolicy
from rl.memory import SequentialMemory
from rl.agents import DQNAgent
from rl.callbacks import ModelIntervalCheckpoint, FileLogger
from rl.policy import LinearAnnealedPolicy, EpsGreedyQPolicy
from rl.memory import SequentialMemory

env = EnergyOptimizationEnv()
states = env.observation_space.shape
actions = env.action_space.n

def build_model(states, actions):
    model = Sequential()
    model.add(Flatten(input_shape=(1,) + states))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(actions, activation='linear'))
    return model

model = build_model(states, actions)
model.summary()

def build_agent(model, actions):
    policy = LinearAnnealedPolicy(EpsGreedyQPolicy(), attr='eps', value_max=1, value_min=0.1, value_test=0.05,nb_steps=500)
    memory = SequentialMemory(limit=10000, window_length=1)
    dqn = DQNAgent(model=model, memory=memory, policy=policy, enable_double_dqn=True,nb_actions=actions, gamma=.98, nb_steps_warmup=100, target_model_update=1e-2)
    return dqn

dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])
dqn.fit(env, nb_steps=30000, log_interval=1000, nb_max_episode_steps=50, visualize=False, verbose=1)

scores = dqn.test(env, nb_episodes=100, visualize=False)
print(np.mean(scores.history['episode_reward']))

在尝试解决该问题时,我尝试了以下方法:

确保必要的库和依赖项已正确安装并且是最新的。 验证 dqn 代理和 env 环境是否设置正确并且彼此兼容。 检查代码及其文档,以确保我使用适当的参数正确使用 dqn.test 函数。 试验各种策略,包括 GreedyQPolicy 和 BoltzmannQPolicy,以评估它们对执行时间的影响。 考虑到我在Google Colab工作,考虑到该平台的潜在局限性和资源限制。 尝试将事件数减少到较小的值(例如 10),以查看在较低的工作负载下问题是否仍然存在。 使用提供的代码片段实现用于 Q 值预测的单独模型:

from rl.policy import GreedyQPolicy
from keras.models import Model
from keras.layers import Input, Reshape

scores = []
num_episodes = 100

policy = GreedyQPolicy()  # Créer une politique GreedyQPolicy

# Construire un modèle séparé pour la prédiction des valeurs Q
input_shape = env.observation_space.shape
inputs = Input(shape=input_shape)
reshape = Reshape((1,) + input_shape)(inputs)
q_values = dqn.model(reshape)
q_model = Model(inputs=inputs, outputs=q_values)

for episode in range(num_episodes):
    state = env.reset()
    done = False
    episode_reward = 0
    
    while not done:
        q_values = q_model.predict(np.expand_dims(state, axis=0))
        action = policy.select_action(q_values[0])
        next_state, reward, done, _ = env.step(action)
        episode_reward += reward
        state = next_state
    
    scores.append(episode_reward)

average_score = np.mean(scores)
print(average_score)

然而,尽管做出了这些努力,问题仍然存在。我期望 dqn.test 函数在合理的时间范围内执行并提供所需的结果,包括 100 个测试集的分数。没想到执行速度太慢或者缺少输出或者错误信息。

我正在寻求有关如何排查和解决此问题的指导,或寻求任何可用于以更有效的方式测试代理性能的替代方法。

python keras reinforcement-learning openai-gym q-learning
1个回答
0
投票

我也有同样的问题。帮助我的解决方案是将这些参数添加到我的 dqn.test 函数中:

action_repetition
nb_max_episode_steps
(https://github.com/keras-rl/keras-rl/blob/master/rl/ core.py).

  • action_repetition(整数):智能体在不再次观察环境的情况下重复相同操作的次数。如果单个操作仅对环境产生很小的影响,则将此值设置为 > 1 可能会很有用。
  • nb_max_episode_steps(整数):代理在自动重置环境之前执行的每集步骤数。如果每个情节都应运行(可能无限期地)直到环境发出终止状态信号,则设置为
    None

例如,这是我的代码修改:

scores = dqn.test(
    env,
    nb_episodes=100,
    action_repetition=1,  # edited
    nb_max_episode_steps=200,   # edited
    visualize=False,
    verbose=1
)
© www.soinside.com 2019 - 2024. All rights reserved.