我一直在尝试使这个简单的 lstm 代码起作用,但数据加载器确实让我感到困惑,我已经实现了多个版本,但 72 的窗口和 12 的预测不适用于数据加载器。你能告诉我出了什么问题吗?
数据集
class TimeSeriesDataset(Dataset):
def __init__(self, csv_file, input_seq_length=72, output_seq_length=12, train=True):
self.data = pd.read_csv(csv_file) # Load CSV file
self.input_seq_length = input_seq_length
self.output_seq_length = output_seq_length
self.train = train
# Normalize data
self.scaler = MinMaxScaler()
self.data[['column4']] = self.scaler.fit_transform(self.data[['column4']])
def __len__(self):
return len(self.data) - self.input_seq_length - self.output_seq_length + 1 # Adjusted length to exclude incomplete sequences
def __getitem__(self, idx):
if self.train:
idx += np.random.randint(0, self.input_seq_length) # Randomize training data
input_data = self.data.iloc[idx:idx+self.input_seq_length].values
target = self.data.iloc[idx+self.input_seq_length:idx+self.input_seq_length+self.output_seq_length]['column4'].values
# Pad sequences
input_data = [torch.tensor(sequence, dtype=torch.float) for sequence in input_data]
input_data = pad_sequence(input_data, batch_first=True)
return input_data, torch.tensor(target, dtype=torch.float)
为了重现性
LSTM
# Define LSTM model
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=1):
super(LSTMModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
out, _ = self.lstm(x, (h0, c0))
out = self.fc(out[:, -1, :])
return out
训练循环
# Define training function
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100):
train_losses = []
val_losses = []
for epoch in range(num_epochs):
model.train()
train_loss = 0.0
for inputs, targets in train_loader:
print(epoch, inputs.shape, targets.shape)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_losses.append(train_loss / len(train_loader))
model.eval()
val_loss = 0.0
with torch.no_grad():
for inputs, targets in val_loader:
outputs = model(inputs)
loss = criterion(outputs, targets)
val_loss += loss.item()
val_losses.append(val_loss / len(val_loader))
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_losses[-1]}, Val Loss: {val_losses[-1]}')
return train_losses, val_losses
# Define function to plot losses
def plot_losses(train_losses, val_losses):
fig = go.Figure()
fig.add_trace(go.Scatter(x=list(range(len(train_losses))), y=train_losses, mode='lines', name='Train Loss'))
fig.add_trace(go.Scatter(x=list(range(len(val_losses))), y=val_losses, mode='lines', name='Val Loss'))
fig.update_layout(title='Training and Validation Losses', xaxis_title='Epoch', yaxis_title='Loss')
fig.show()
主要
# Main function
def main():
# Load data
dataset = TimeSeriesDataset('sample_data.csv')
# Split data into train, validation, and test sets
train_size = int(0.6 * len(dataset))
val_size = int(0.2 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_data, val_data, test_data = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])
# Create data loaders
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
val_loader = DataLoader(val_data, batch_size=64)
test_loader = DataLoader(test_data, batch_size=64)
# Initialize model, loss function, and optimizer
model = LSTMModel(input_size=dataset.data.shape[1], hidden_size=64, output_size=1)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Train model
train_losses, val_losses = train_model(model, train_loader, val_loader, criterion, optimizer)
# Plot losses
plot_losses(train_losses, val_losses)
# Evaluate model on test data
model.eval()
test_loss = 0.0
with torch.no_grad():
for inputs, targets in test_loader:
outputs = model(inputs)
loss = criterion(outputs, targets)
test_loss += loss.item()
print(f'Test Loss: {test_loss / len(test_loader)}')
# Run main function
if __name__ == "__main__":
main()
生成样本数据
import pandas as pd
import numpy as np
import datetime
# Generate sample data
num_rows = 1200
start_date = datetime.datetime(2024, 1, 1)
time_index = [start_date + datetime.timedelta(minutes=5*i) for i in range(num_rows)]
column1 = np.random.randn(num_rows) * 10 # Sample values for column 1
column2 = np.random.randn(num_rows) * 100 # Sample values for column 2
column3 = np.random.randn(num_rows) * 1000 # Sample values for column 3
column4 = np.random.randn(num_rows) * 10000 # Sample values for column 4
# Create DataFrame
data = {
# 'datetime': time_index,
'column1': column1.astype(float),
'column2': column2.astype(float),
'column3': column3.astype(float),
'column4': column4.astype(float)
}
df = pd.DataFrame(data)
# Save to CSV
df.to_csv('sample_data.csv', index=False)
这个问题是随机化超出了数据长度本身
if self.train:
idx += np.random.randint(0, self.input_seq_length) # Randomize training data
已删除或更新获取项目
def __len__(self):
return len(self.data) - self.input_seq_length - self.output_seq_length - 72 + 1 # Adjusted length to exclude incomplete sequences