我应该如何提高它的速度?强化学习 DQN

问题描述 投票:0回答:1

我是 python 新手,这是我的第一个 DQN NN 网络。我使用健身房环境,代理学习采取最佳行动,从而在交易时获得最大利润。

我的代码使用 CPU 和每个代理大约 500Mb 的 RAM,如果使用 GPU,可以使用 200MB 的 GPU VRAM。

我的问题是。我如何在 GPU 上计算所有这些并使用并行计算。或者我应该只使用 CPU 核心,因为它们在没有并行计算的情况下速度更快,我相信这段代码现在就是这样。

如果您使用 GPU,您可能会收到此错误 RuntimeError: Cannot re-initialize CUDA in forked subprocess。要将 CUDA 与多处理结合使用,您必须使用“spawn”启动方法

有人对速度有任何可能的改进吗?

import os
import pandas as pd
import numpy as np
import gym
from gym import spaces
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data.dataset import IterableDataset
from pytorch_lightning import LightningModule, Trainer
from typing import List, Tuple, Generator
import random
import sqlite3
import time
import subprocess as sp
import psutil

import multiprocessing as mp

 
avg_reward_list = []  # List to store average rewards
avg_total_actions = []  # List to store average total actions

episode_times = []
num_episodes = 5000

if torch.cuda.is_available():
    gpu_true = 1
else:
    gpu_true = 0

# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if gpu_true == 0:
    # Create or connect to the SQLite database
    conn = sqlite3.connect('trading_data.db')
    cursor = conn.cursor()

    # Create a table to store episode data if it doesn't exist
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS data (
            episode REAL,
            net_worth REAL,
            steps INTEGER,
            epsilon REAL, 
            avg_reward REAL,
            avg_total_actions REAL,
            profit_factor REAL
        )
    ''')

    # Clear existing data in the table
    cursor.execute('DELETE FROM data')

    # Commit changes and close the connection
    conn.commit()
    conn.close()


def get_usage():
    # Getting loadover15 minutes
    load1, load5, load15 = psutil.getloadavg()
    
    cpu_usage = psutil.cpu_percent(interval=1)

    if gpu_true == 1:
        output_to_list = lambda x: x.decode('ascii').split('\n')[:-1]
        COMMAND = "nvidia-smi --query-gpu=utilization.gpu --format=csv"
        try:
            gpu_usage_info = output_to_list(sp.check_output(COMMAND.split(), stderr=sp.STDOUT))[1:]
        except sp.CalledProcessError as e:
            raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output))
        gpu_usage_values = [int(x.split()[0]) for i, x in enumerate(gpu_usage_info)]

        print("The CPU usage is : ", cpu_usage, 'GPU',gpu_usage_values)
    else:
        print("The CPU usage is : ", cpu_usage)



# Add CUDA-related environment variable settings
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
os.environ["TORCH_USE_CUDA_DSA"] = "1"
#print(torch.cuda.is_available())

# Define the calculate_profit function
def calculate_profit(next_price, current_price, amount_held):
    if amount_held > 0:
        return (current_price - next_price) * amount_held
    elif amount_held < 0:
        return (next_price - current_price) * abs(amount_held)
    else:
        return 0

class DQN(nn.Module):
    def __init__(self, in_states, h1_nodes, out_actions, dropout_prob=0.0):  
        super().__init__()
        self.fc1 = nn.Linear(in_states, h1_nodes)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc2 = nn.Linear(h1_nodes, out_actions)  

    def forward(self, x):
        x = x.to(next(self.parameters()).device)
        x = torch.relu(self.fc1(x.float()))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

class CustomEnv(gym.Env):
    def __init__(self, df, initial_balance=10000, lookback_window_size=1):
        super(CustomEnv, self).__init__()
        self.df = df
        self.df_total_steps = 10000
        self.initial_balance = initial_balance
        self.lookback_window_size = lookback_window_size
        self.action_space = spaces.Discrete(3)  
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.lookback_window_size,), dtype=np.float32)
        self.current_step = None
        self.total_actions = 0
        self.step_count = 0
        self.current_price = 0
        self.net_worth = 0
        self.gamma = 0.90
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.999998
        self.model = DQN(1, 80, 3).to(device)
        self.target_model = DQN(1, 80, 3).to(device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.batch_size = 3000
        self.training_data = []


        self.episode_rewards_array = []  # List to store rewards for this episode
        self.action_total_array = []  # List to store total actions for this episode
        self.net_worth_array = []  # List to store net worth for this episode
        self.not_profitable_profits = 0
        self.profitable_profits = 0

    def reset(self):
        self.net_worth = self.initial_balance
        self.current_step = self.lookback_window_size
        self.total_actions = random.uniform(-10, 10)
        self.current_price = self.df[self.current_step, 0]  # Assuming 'close' is the first column
        return self._next_observation()

    def _next_observation(self):
        close_prices = self.df[self.current_step - self.lookback_window_size:self.current_step, 1]  # Assuming 'Normalized_SMA_Slope' is the second column
        return close_prices.clone().to(device)

    def step(self, action):
        self.current_step += 1
        self.step_count += 1
        if self.current_step >= len(self.df):
            return self._next_observation(), 0, True, {}
        
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        obs = self._next_observation()
        self.total_actions = (action - 1) * 10  # Convert action index to total actions
        next_price = self.df[self.current_step + 1, 0] if self.current_step + 1 < len(self.df) else self.current_price
        self.current_price = self.df[self.current_step, 0]  # Assuming 'close' is the first column
        holding = self.total_actions / 10
        profit = calculate_profit(next_price, self.current_price, holding)
        self.net_worth += profit
        reward = profit
        if self.current_step >= self.df_total_steps or self.net_worth < 100:
            done = 1
        else:
            done = 0
        if self.step_count > self.batch_size:
            #self.target_model.load_state_dict(self.model.state_dict())
            self.step_count = 0
            get_usage()

        self.episode_rewards_array.append(reward)
        self.action_total_array.append(self.total_actions)
        self.net_worth_array.append(self.net_worth)

        if profit > 0: #and avg_action < 1 and avg_action > -1:
            #self.profitable_profits.append(profit)
            self.profitable_profits += profit 
            #profit = profit*2
        else:
            self.not_profitable_profits -= profit

        self.profit_factor = ((self.profitable_profits / self.not_profitable_profits) if self.not_profitable_profits != 0 else 0.0) - 1

        # Append data for training
        self.training_data.append((obs, action, reward))

        if len(self.training_data) >= 100:
            self.train_dqn_batch()
            self.training_data = []

        return obs, reward, done, {}

    def train_dqn_batch(self):
        if len(self.training_data) == 0:
            return

        obs_batch, action_batch, reward_batch = zip(*self.training_data)
        obs_batch = torch.stack(obs_batch)
        action_batch = torch.tensor(action_batch, dtype=torch.int64).unsqueeze(1).to(device)
        reward_batch = torch.tensor(reward_batch, dtype=torch.float32).unsqueeze(1).to(device)

        q_values = self.model(obs_batch)
        q_value = q_values.gather(1, action_batch).squeeze(1)
        expected_q_values = reward_batch

        # Ensure expected_q_values has the same shape as q_value
        expected_q_values = expected_q_values.view(-1)

        # Compute MSE loss
        loss = nn.functional.mse_loss(q_value, expected_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

class RLDataset(IterableDataset):
    def __init__(self, env: CustomEnv, sample_size: int = 200) -> None:
        self.epoch_count = 0
        self.env = env
        self.sample_size = sample_size
    def __iter__(self) -> Generator[Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], None, None]:
        obs = self.env.reset().to(device)  # Move initial observation to the correct device
        for _ in range(self.env.df_total_steps):
            if random.random() < self.env.epsilon:
                action = self.env.action_space.sample()
            else:
                with torch.no_grad():  # Ensure no gradient tracking for inference
                    action = self.env.model(obs).argmax().item()
            next_obs, reward, done, _ = self.env.step(action)
            reward = torch.tensor([reward], dtype=torch.float32).to(device)  # Ensure reward tensor has correct shape and device
            next_obs = next_obs.to(device)  # Move next observation to the correct device
            yield obs.view(1), torch.tensor([action]).to(device), reward.squeeze(), next_obs.view(1), done
            if done == 1:    
                if gpu_true ==0:
                    self.epoch_count += 1
                    # Calculate average reward for this episode
                    avg_reward = np.mean(self.env.episode_rewards_array)  # No need to move to CPU if it's a list
                    avg_reward_list.append(avg_reward)  # Append average reward to list
                    avg_total_actions = np.mean(self.env.action_total_array)  # No need to move to CPU if it's a list
                    profit_factor = self.env.profit_factor.item()
                    # Save episode data to the database
                    conn = sqlite3.connect('trading_data.db')
                    cursor = conn.cursor()
                    rounded_net_worth = self.env.net_worth.item()
                    cursor.execute('''
                        INSERT INTO data (episode, net_worth, steps, epsilon, avg_reward, avg_total_actions, profit_factor)
                        VALUES (?, ?, ?, ?, ?, ?, ?)
                    ''', (self.epoch_count, rounded_net_worth, self.env.current_step, self.env.epsilon, avg_reward, avg_total_actions, profit_factor))
                    conn.commit()
                    conn.close()
                obs = self.env.reset().to(device)  # Move reset observation to the correct device
            else:
                obs = next_obs

class DQNLit(LightningModule):
    def __init__(self, env: CustomEnv, batch_size) -> None:
        super().__init__()
        self.env = env
        self.model = self.env.model
        self.target_net = self.env.target_model
        self.batch_size = batch_size

    def forward(self, x):
        return self.model(x.float())
    
    def dqn_mse_loss(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]) -> torch.Tensor:
        obs, action, reward, next_obs, done = batch

        # Move tensors to GPU and convert to float32
        obs = obs.to(device).float()
        action = action.squeeze(1).to(device)  # Squeeze to remove extra dimension
        reward = reward.to(device).float()
        next_obs = next_obs.to(device).float()
        done = done.to(device).float()

        # Calculate Q-values
        q_values = self.model(obs)
        next_q_values = self.target_net(next_obs).max(1)[0]

        # Calculate expected Q-values
        expected_q_values = reward + (1 - done) * self.env.gamma * next_q_values

        # Ensure expected_q_values has the same shape as q_values
        expected_q_values = expected_q_values.view(-1, 1)

        # Compute MSE loss
        return nn.MSELoss()(q_values.gather(1, action.unsqueeze(1)), expected_q_values.detach())


    def training_step(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], batch_idx: int):
        loss = self.dqn_mse_loss(batch)
        return loss

    def configure_optimizers(self):
        return optim.Adam(self.model.parameters(), lr=0.001)

    def train_dataloader(self):
        return DataLoader(RLDataset(self.env), batch_size=self.batch_size)




def train_agent():
    try:
        df = pd.read_pickle("/content/drive/MyDrive/Copy of binance-BTCUSDT-1m.pkl")  
        df = df.sort_values('date_open')
        print('started')
    except Exception as e:
        df = pd.read_pickle("data/binance-BTCUSDT-1m.pkl")
        df = df.sort_values('date_open')

    window_size = 10
    df['SMA'] = df['close'].rolling(window=window_size).mean()
    sma_slope = df['SMA'] - df['SMA'].shift(1)
    window_long = window_size * 10
    sma_slope_long = df['SMA'] - df['SMA'].shift(window_long)
    stdev_sma_slope_long = sma_slope_long.rolling(window=window_long, min_periods=1).std()
    normalized_sma_slope = sma_slope / stdev_sma_slope_long
    df['Normalized_SMA_Slope'] = normalized_sma_slope

    # Keep only 'close' and 'Normalized_SMA_Slope' columns
    df = df[['close', 'Normalized_SMA_Slope']]

    # Drop all NaN values
    df.dropna(inplace=True)

    # Convert DataFrame columns to float32
    df = df.astype(np.float32)

    lookback_window_size = 1
    train_df = df[:-lookback_window_size]

    # Convert DataFrame to PyTorch tensor and move to GPU
    df = torch.tensor(df.values, dtype=torch.float32).to(device)

    # Pass the PyTorch tensor to CustomEnv
    train_env = CustomEnv(df, lookback_window_size=lookback_window_size)
    batch_size = 1000

    model = DQNLit(train_env, batch_size)
    
    trainer = Trainer(accelerator='cpu', max_epochs=1000)
    trainer.fit(model)

def train_worker(train_func):
    train_func()

if __name__ == '__main__':
    # Number of training agents to run in parallel
    num_agents = 1 # can bug out if using gpu/ using multiple for multiple cpu cores

    # Create a pool of worker processes
    pool = mp.Pool(processes=num_agents)

    # Map the train_worker function to the pool
    pool.map(train_worker, [train_agent] * num_agents)

    # Close the pool of worker processes
    pool.close()
    #pool.join()

python python-3.x pytorch
1个回答
0
投票

我手动分析了代码,看看什么是计算时间最长的。我使用这样的代码来测量每个函数所花费的时间。我尝试过

import line_profiler
但我无法让该模块或替代模块工作。

dqn_mse_loss_timer = []

start_time = time.time()
end_time = time.time()
elapsed_time = (end_time - start_time)
dqn_mse_loss_timer.append(elapsed_time)

sum_dqn_mse_loss_timer = sum(dqn_mse_loss_timer)
print("sum_dqn_mse_loss_timer:", sum_dqn_mse_loss_timer, "seconds")
© www.soinside.com 2019 - 2024. All rights reserved.