我使用 NN 创建了应该与 california_housing 数据集一起使用的代码。 但这不起作用。 我检查了所有代码,但仍然没有解决方案。我很困惑。 也许有人可以帮助我? 代码:
import torch.nn.functional as F
import torch.nn as nn
import torch
import pandas as pd
import sklearn.datasets
import matplotlib.pyplot as plt
import math
from sklearn.model_selection import train_test_split
data = sklearn.datasets.fetch_california_housing(as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(data.data, data.target, test_size=0.25, random_state=13)
class MyDataset(torch.utils.data.Dataset):
def __init__(self, x, y):
self.features = x
self.target = y
self.fv = self.features.values
self.tv = self.target.values
self.f_tensor = torch.tensor(self.fv, dtype=torch.float32)
self.t_tensor = torch.tensor(self.tv, dtype=torch.float32).reshape(-1, 1)
def __len__(self):
return len(self.features)
def __getitem__(self, idx):
return self.f_tensor[idx], self.t_tensor[idx]
train_dataset = MyDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=128,
shuffle=True, drop_last=True)
test_dataset = MyDataset(X_test, y_test)
test_loader = torch.utils.data.DataLoader(test_dataset,
batch_size=16,
shuffle=False, drop_last=True)
class FeedForward(nn.Module):
def __init__(self, input_dim, hidden_dims, output_dim=1):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dims[0])
self.bn1 = nn.BatchNorm1d(hidden_dims[0])
self.dropout1 = nn.Dropout(0.2)
self.fc2 = nn.Linear(hidden_dims[0], hidden_dims[1])
self.bn2 = nn.BatchNorm1d(hidden_dims[1])
self.dropout2 = nn.Dropout(0.2)
self.fc3 = nn.Linear(hidden_dims[1], hidden_dims[2])
self.bn3 = nn.BatchNorm1d(hidden_dims[2])
self.dropout3 = nn.Dropout(0.2)
self.fc4 = nn.Linear(hidden_dims[2], hidden_dims[3])
self.bn4 = nn.BatchNorm1d(hidden_dims[3])
self.dropout4 = nn.Dropout(0.2)
self.fc5 = nn.Linear(hidden_dims[3], output_dim)
def forward(self, x):
x = self.fc1(x)
x = self.bn1(x)
x = F.relu(x)
x = self.dropout1(x)
x = self.fc2(x)
x = self.bn2(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc3(x)
x = self.bn3(x)
x = F.relu(x)
x = self.dropout3(x)
x = self.fc4(x)
x = self.bn4(x)
x = F.relu(x)
x = self.dropout4(x)
x = self.fc5(x)
return x
def train(model, optimizer, criterion, train_loader, device):
model.train()
running_loss = 0.0
correct = 0
total = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
train_loss = running_loss / len(train_loader)
train_acc = correct / total
return train_loss, train_acc
def test(model, criterion, test_loader, device):
model.eval()
running_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
running_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
test_loss = running_loss / len(test_loader)
test_acc = correct / total
return test_loss, test_acc
def train_model(model, optimizer, criterion, train_loader, test_loader, device, num_epochs=10):
train_losses = []
train_accs = []
test_losses = []
test_accs = []
for epoch in range(num_epochs):
train_loss, train_acc = train(model, optimizer, criterion, train_loader, device)
test_loss, test_acc = test(model, criterion, test_loader, device)
train_losses.append(train_loss)
train_accs.append(train_acc)
test_losses.append(test_loss)
test_accs.append(test_acc)
print(f"Epoch [{epoch+1}/{num_epochs}]: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}")
return train_losses, train_accs, test_losses, test_accs
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
hidden_dims = [100, 64, 32, 16]
model = FeedForward(8, hidden_dims)
model = model.to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
train_losses, train_accs, test_losses, test_accs = train_model(model, optimizer, criterion,
train_loader, test_loader, device)
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label="Train Loss")
plt.plot(test_losses, label="Test Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(train_accs, label="Train Accuracy")
plt.plot(test_accs, label="Test Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.show()
我试图改变:层数、优化器、优化器参数、纪元数、学习率等等......
您的火车损失正在下降,所以您走在正确的轨道上。问题主要在于计算验证指标的方式。
代码最初使用
predicted.eq(labels)
来测量值是否完全相等 - 但由于这是一个回归问题,因此值不会完全相等(它们必须匹配到最后一个小数位)。简而言之,代码使用分类准确性指标来解决回归问题。
由于这是一个回归问题,因此任务是使预测尽可能接近目标,并且这里的误差可以使用平均绝对误差(MAE)来测量,这意味着预测“有多远”平均而言。
我修改了代码,它按预期运行。网络通常需要缩放数据;它有助于稳定训练并提高收敛性。我已在代码中添加了功能缩放。
import torch.nn.functional as F
import torch.nn as nn
import torch
import pandas as pd
import sklearn.datasets
import matplotlib.pyplot as plt
import math
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
data = sklearn.datasets.fetch_california_housing(as_frame=True)
X_train_orig, X_test_orig, y_train, y_test = train_test_split(data.data, data.target, test_size=0.25, random_state=13)
#Fit scaler on training data, and scale the data
scaler = StandardScaler().fit(X_train_orig).set_output(transform='pandas')
X_train = scaler.transform(X_train_orig)
X_test = scaler.transform(X_test_orig)
class MyDataset(torch.utils.data.Dataset):
def __init__(self, x, y):
self.features = x
self.target = y
self.fv = self.features.values
self.tv = self.target.values
self.f_tensor = torch.tensor(self.fv, dtype=torch.float32)
self.t_tensor = torch.tensor(self.tv, dtype=torch.float32).reshape(-1, 1)
def __len__(self):
return len(self.features)
def __getitem__(self, idx):
return self.f_tensor[idx], self.t_tensor[idx]
train_dataset = MyDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=128,
shuffle=True, drop_last=True)
test_dataset = MyDataset(X_test, y_test)
test_loader = torch.utils.data.DataLoader(test_dataset,
batch_size=16,
shuffle=False, drop_last=True)
class FeedForward(nn.Module):
def __init__(self, input_dim, hidden_dims, output_dim=1):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dims[0])
self.bn1 = nn.BatchNorm1d(hidden_dims[0])
self.dropout1 = nn.Dropout(0.2)
self.fc2 = nn.Linear(hidden_dims[0], hidden_dims[1])
self.bn2 = nn.BatchNorm1d(hidden_dims[1])
self.dropout2 = nn.Dropout(0.2)
self.fc3 = nn.Linear(hidden_dims[1], hidden_dims[2])
self.bn3 = nn.BatchNorm1d(hidden_dims[2])
self.dropout3 = nn.Dropout(0.2)
self.fc4 = nn.Linear(hidden_dims[2], hidden_dims[3])
self.bn4 = nn.BatchNorm1d(hidden_dims[3])
self.dropout4 = nn.Dropout(0.2)
self.fc5 = nn.Linear(hidden_dims[3], output_dim)
def forward(self, x):
x = self.fc1(x)
x = self.bn1(x)
x = F.relu(x)
x = self.dropout1(x)
x = self.fc2(x)
x = self.bn2(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc3(x)
x = self.bn3(x)
x = F.relu(x)
x = self.dropout3(x)
x = self.fc4(x)
x = self.bn4(x)
x = F.relu(x)
x = self.dropout4(x)
x = self.fc5(x)
return x
def train(model, optimizer, criterion, train_loader, device):
model.train()
running_loss = 0.0
running_abs_error = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
predicted = model(inputs)
loss = criterion(predicted, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
running_abs_error += abs(predicted - labels).sum().item()
train_loss = running_loss / len(train_loader)
train_mae = running_abs_error / len(train_loader.dataset)
return train_loss, train_mae
def test(model, criterion, test_loader, device):
model.eval()
running_loss = 0.0
running_abs_error = 0
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
predicted = model(inputs)
loss = criterion(predicted, labels)
running_loss += loss.item()
running_abs_error += abs(predicted - labels).sum().item()
test_loss = running_loss / len(test_loader)
test_mae = running_abs_error / len(test_loader.dataset)
return test_loss, test_mae
def train_model(model, optimizer, criterion, train_loader, test_loader, device, num_epochs=10):
train_losses = []
train_mae_scores = []
test_losses = []
test_mae_scores = []
for epoch in range(num_epochs):
train_loss, train_mae = train(model, optimizer, criterion, train_loader, device)
test_loss, test_mae = test(model, criterion, test_loader, device)
train_losses.append(train_loss)
train_mae_scores.append(train_mae)
test_losses.append(test_loss)
test_mae_scores.append(test_mae)
print(f"Epoch [{epoch+1}/{num_epochs}]: Train Loss: {train_loss:.4f}, Train MAE: {train_mae:.4f}, "
f"Test Loss: {test_loss:.4f}, Test MAE: {test_mae:.4f}")
return train_losses, train_mae_scores, test_losses, test_mae_scores
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
hidden_dims = [100, 64, 32, 16]
model = FeedForward(8, hidden_dims)
model = model.to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
train_losses, train_mae_scores, test_losses, test_mae_scores = train_model(model, optimizer, criterion,
train_loader, test_loader, device)
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label="Train Loss")
plt.plot(test_losses, label="Test Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(train_mae_scores, label="Train MAE")
plt.plot(test_mae_scores, label="Test MAE")
plt.xlabel("Epochs")
plt.ylabel("mean absolute error")
plt.legend()
plt.gcf().set_size_inches(8, 2)
plt.show()