PyTorch 闪电运行时错误:CUDA 错误:初始化错误。 CPU 还可以工作

问题描述 投票:0回答:1

我的 Python 编码能力很差,而且我是 Pytorch 的新手。问题是我可以通过定义 Accelerator = 'cpu' 在 CPU 上运行此代码,但是当我将其设置为 GPU 时,代码会卡在运行 _ = iter(train_loader) 上。我没有 Nvidia GPU,我一直在尝试在 Google Collab 上运行它,但它无法在 T4 GPU 上运行,因为它卡在 _ = iter(train_loader) 上。

运行时错误:CUDA错误:初始化错误 如果您安装了模块,您可以轻松运行此代码

如果您知道如何在 PyTorch Lightning 中使用 GPU 加速,请帮忙。

import os
import pandas as pd
import numpy as np
import gym
from gym import spaces
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data.dataset import IterableDataset
from pytorch_lightning import LightningModule, Trainer
from typing import List, Tuple, Generator
import random
import multiprocessing as mp



# Add CUDA-related environment variable settings
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
os.environ["TORCH_USE_CUDA_DSA"] = "1"
print(torch.cuda.is_available())

avg_reward_list = []  # List to store average rewards
avg_total_actions = []  # List to store average total actions

episode_times = []
num_episodes = 5000


# Define the calculate_profit function
def calculate_profit(next_price, current_price, amount_held):
    if amount_held > 0:
        return (current_price - next_price) * amount_held
    elif amount_held < 0:
        return (next_price - current_price) * abs(amount_held)
    else:
        return 0

# Check if CUDA (GPU) is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class DQN(nn.Module):
    def __init__(self, in_states, h1_nodes, out_actions, dropout_prob=0.0):  
        super().__init__()
        self.fc1 = nn.Linear(in_states, h1_nodes)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc2 = nn.Linear(h1_nodes, out_actions)

    def forward(self, x):
        x = torch.relu(self.fc1(x.float()))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

class CustomEnv(gym.Env):
    def __init__(self, df, initial_balance=10000, lookback_window_size=1):
        super(CustomEnv, self).__init__()
        self.df = df.dropna().reset_index()
        self.df_total_steps = 100
        self.initial_balance = initial_balance
        self.lookback_window_size = lookback_window_size
        self.action_space = spaces.Discrete(3)  
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.lookback_window_size,), dtype=np.float32)
        self.current_step = None
        self.total_actions = 0
        self.current_price = 0
        self.net_worth = 0
        self.avg_total_actions = 0
        self.avg_total_actionss = 0
        self.not_profitable_profits = 0
        self.profitable_profits = 0
        self.step_count = 0
        self.avg_total_actionss_array = 0
        self.profit_factor = 0
        

        self.episode_rewards_array = []  # List to store rewards for this episode
        self.action_total_array = []  # List to store total actions for this episode
        self.net_worth_array = []  # List to store net worth for this episode

        self.gamma = 0.90
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.999999
        h1_nodes = 80
        in_states = 1
        out_actions = 3
        learning_rate = 0.001
        self.dropout_prob=0.5
        self.model = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)
        self.target_model = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.batch_size = 1000

    def remember(self, experience):
        self.memory.append(experience)
        if len(self.memory) > self.buffer_size:
            self.memory.pop(0)  # Remove oldest experience if buffer exceeds size

    def sample_batch(self, batch_size):
        return random.sample(self.memory, min(len(self.memory), batch_size))

    def reset(self):
        self.net_worth = self.initial_balance
        self.current_step = self.lookback_window_size
        self.total_actions = random.uniform(-10, 10)
        self.current_price = self.df.loc[self.current_step, 'close']
        return self._next_observation()

    def _next_observation(self):
        close_prices = self.df.loc[self.current_step - self.lookback_window_size:self.current_step - 1, 'Normalized_SMA_Slope'].values
        return torch.tensor(close_prices, dtype=torch.float32)

    def step(self, action):
        self.step_count += 1
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        self.current_step += 1
        if self.current_step >= len(self.df):
            return self._next_observation(), 0, True, {}

        obs = self._next_observation()

        self.total_actions = (action - 1) * 10  # Convert action index to total actions

        next_price = self.df.loc[self.current_step + 1, 'close'] if self.current_step + 1 < len(self.df) else self.current_price
        self.current_price = self.df.loc[self.current_step, 'close']
        holding = self.total_actions / 10
        profit = calculate_profit(next_price, self.current_price, holding)
        self.net_worth += profit

        reward = profit

        if self.current_step >= self.df_total_steps or self.net_worth < 100:
            done = 1
        else:
            done = 0

        self.avg_total_actions = np.mean(self.total_actions)

        self.train_dqn(obs, action, reward, done)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        if profit > 0: #and avg_action < 1 and avg_action > -1:
            #self.profitable_profits.append(profit)
            self.profitable_profits += profit 
            #profit = profit*2
        else:
            self.not_profitable_profits -= profit

        self.profit_factor = ((self.profitable_profits / self.not_profitable_profits) if self.not_profitable_profits != 0 else 0.0) - 1

        if self.step_count > self.batch_size:
            self.target_model.load_state_dict(self.model.state_dict())
            self.step_count = 0
        #print(f'Step: {self.current_step}, Net Worth: {round(self.net_worth, 1)}, PRICE: {round(self.current_price, 1)}, action: {round(self.total_actions, 1)}, Profit Factor: {self.profit_factor}')
        #print(f'working!')

        self.episode_rewards_array.append(reward)
        self.action_total_array.append(self.total_actions)
        self.net_worth_array.append(self.net_worth)

        return obs, reward, done, {}


    def train_dqn(self, obs, action, reward, done): #nothing wrong here 
        obs = torch.tensor(obs).unsqueeze(0).float()
        action = torch.tensor([action], dtype=torch.int64)
        reward = torch.tensor([reward], dtype=torch.float32).requires_grad_()
        next_obs = obs
        done = torch.tensor([done], dtype=torch.float32).requires_grad_()

        # Reshape obs to match the input size expected by fc1 layer
        obs = obs.view(-1, self.lookback_window_size)  

        q_values = self.model(obs)
        next_q_values = self.model(next_obs)
        q_value = q_values.gather(1, action.unsqueeze(1)).squeeze(1)
        next_q_value = next_q_values.max(1)[0].detach()  # Detach next_q_value
        expected_q_value = reward + self.gamma * next_q_value * (1.0 - done)

        loss = nn.functional.mse_loss(q_value, expected_q_value)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()




class RLDataset(IterableDataset):
    def __init__(self, env: CustomEnv, sample_size: int = 200) -> None:
        self.epoch_count = 0
        self.env = env
        self.sample_size = sample_size

    def __iter__(self) -> Generator[Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], None, None]:
        obs = self.env.reset()
        for _ in range(self.env.df_total_steps):
            if random.random() < self.env.epsilon:  # Ensure to use self.env.epsilon
                action = self.env.action_space.sample()  # Random action
            else:
                action = self.env.model(torch.FloatTensor(obs)).argmax().item()
            next_obs, reward, done, _ = self.env.step(action)
            yield obs, action, reward, next_obs, done
            if done==1:
                self.epoch_count += 1
                # Calculate average reward for this episode
                avg_reward = np.mean(self.env.episode_rewards_array)
                avg_reward_list.append(avg_reward)  # Append average reward to list
                avg_total_actions = np.mean(self.env.action_total_array)

                # #print(f" End of episode {self.epoch_count}! Net Worth={round(self.env.net_worth, 2)}! Epsilon={self.env.epsilon} Avg Reward={avg_reward}, Avg Position={avg_total_actions}, profit factor={self.env.profit_factor}")
                # # Save episode data to the database
                # conn = sqlite3.connect('trading_data.db')
                # cursor = conn.cursor()
                # rounded_net_worth = round(self.env.net_worth, 2)
                # cursor.execute('''
                #     INSERT INTO data (episode, net_worth, steps, epsilon, avg_reward, avg_total_actions, profit_factor)
                #     VALUES (?, ?, ?, ?, ?, ?, ?)
                # ''', (self.epoch_count, rounded_net_worth, self.env.current_step, self.env.epsilon, avg_reward, avg_total_actions, self.env.profit_factor))
                # conn.commit()
                # conn.close()

                obs = self.env.reset()
            else:
                obs = next_obs

class DQNLit(LightningModule):
    def __init__(self, env: CustomEnv, batch_size) -> None:
        super().__init__()  # Initialize epoch count
        self.env = env

        h1_nodes = 80
        in_states = 1
        out_actions = 3
        self.dropout_prob=0.5 # this kinda brok everything?
        self.net = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)
        self.target_net = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)

    def forward(self, x):
        return self.net(x.float())
    
    def dqn_mse_loss(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]) -> torch.Tensor:
        obs, action, reward, next_obs, done = batch
        
        # Convert reward and done to Float data type
        reward = reward.float()
        done = done.float()
        
        q_values = self.net(obs)
        next_q_values = self.target_net(next_obs).max(1)[0]
        expected_q_values = reward + (1 - done) * self.env.gamma * next_q_values
        
        return nn.MSELoss()(q_values.gather(1, action.unsqueeze(1)).squeeze(1), expected_q_values.detach())

    def training_step(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], batch_idx: int):
        loss = self.dqn_mse_loss(batch)
        return loss

    def configure_optimizers(self):
        return optim.Adam(self.net.parameters(), lr=0.001)

    def train_dataloader(self):
        num_workers = 1  # Choose the number of workers you want
        return DataLoader(RLDataset(self.env), batch_size=self.batch_size, num_workers=num_workers, persistent_workers=True)




def main():
    num_rows = 2000
    # Create synthetic data
    data = {
        'close': np.full(num_rows, 100),  # Set close to 100 for all rows
        #'Normalized_SMA_Slope': np.full(num_rows, 0.1)  # Set Normalized_SMA_Slope to 0.1 for all rows
    }

    # Create a DataFrame from the synthetic data
    df = pd.DataFrame(data)

    # Set a specific value for index 0
    specific_value = 50
    df.at[0, 'close'] = specific_value

    # Calculate Simple Moving Average (SMA)
    window_size = 10
    df['SMA'] = df['close'].rolling(window=window_size).mean()

    # Calculate the Simple Moving Average slope for normalization
    sma_slope = df['SMA'] - df['SMA'].shift(1)

    # SMA slope for normalization
    window_long = window_size * 10
    sma_slope_long = df['SMA'] - df['SMA'].shift(window_long)

    # Calculate the standard deviation of long-term SMA slope
    stdev_sma_slope_long = sma_slope_long.rolling(window=window_long, min_periods=1).std()

    # Normalize the Simple Moving Average slope
    normalized_sma_slope = sma_slope / stdev_sma_slope_long

    # Add the normalized_sma_slope to df as a new column
    df['Normalized_SMA_Slope'] = normalized_sma_slope

    # Define the lookback window size
    lookback_window_size = 1


    # Initialize the environment with the dataset and lookback window size
    train_df = df[:-lookback_window_size]
    train_env = CustomEnv(train_df, lookback_window_size=lookback_window_size)
    batch_size = 1000
    print('started1')
    train_dataset = RLDataset(train_env)
    print('started2')
    train_loader = DataLoader(train_dataset, batch_size=batch_size, num_workers=1)
    print('started3')

    model = DQNLit(train_env, batch_size)

    _ = iter(train_loader)  # Call iter(train_loader) to avoid the RuntimeError
    trainer = Trainer(accelerator='gpu')#, max_epochs=200, strategy='ddp_notebook')
    print('started4')
    trainer.fit(model, train_loader)
    print('started5')

if __name__ == '__main__':
    # Required on Windows when using multiprocessing
    #mp.set_start_method("spawn")
    main()











python pytorch reinforcement-learning pytorch-lightning
1个回答
0
投票

train_loader = DataLoader(train_dataset,batch_size=batch_size)#,num_workers=1

这是问题,因为 num_workers=1 创建了 2 个实例,这就是问题所在。但是将其设置为 num_workers=0 会产生一个错误,指出 num_workers 需要大于 0,但这是一个错误。所以删除 num_workers 参数然后它就可以正常工作,因为本质上等于 num_workers = 0。

© www.soinside.com 2019 - 2024. All rights reserved.