我是使用 pytorch 进行 seq2seq 的初学者,我想创建一个以文本作为输入、输出为数字模式的模型。
例如,我的“en_ids”是已经转换为单词索引的输入,“NPY_DATA”是该输入的模式输出
train_data[0]
{'NPY_DATA': tensor([2.0000, 0.5201, 0.3295, ..., 0.0000, 0.0000, 3.0000]),
'en_ids': tensor([ 2, 102, 0, 10, 0, 4, 0, 3]),
'SENTENCE': 'i call it painting the wall',
'en_tokens': ['<sos>', 'i', 'call', 'it', 'painting', 'the', 'wall', '<eos>']}
我这样定义编码器架构,这就是通用编码器架构。
class Encoder(nn.Module):
def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers, dropout):
super().__init__()
self.hidden_dim = hidden_dim
self.n_layers = n_layers
self.embedding = nn.Embedding(input_dim, embedding_dim)
self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
self.dropout = nn.Dropout(dropout)
def forward(self, src):
# src = [src length, batch size]
embedded = self.dropout(self.embedding(src))
# embedded = [src length, batch size, embedding dim]
outputs, (hidden, cell) = self.rnn(embedded)
# outputs = [src length, batch size, hidden dim * n directions]
# hidden = [n layers * n directions, batch size, hidden dim]
# cell = [n layers * n directions, batch size, hidden dim]
# outputs are always from the top hidden layer
return hidden, cell
在解码器之后,我不确定是否应该使用嵌入层?因为我的输出是数字,不像单词索引。
class Decoder(nn.Module):
def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers, dropout):
super().__init__()
self.output_dim = output_dim
self.hidden_dim = hidden_dim
self.n_layers = n_layers
self.embedding = nn.Embedding(output_dim, embedding_dim)
self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
self.fc_out = nn.Linear(hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, input, hidden, cell):
# input = [batch size]
# hidden = [n layers * n directions, batch size, hidden dim]
# cell = [n layers * n directions, batch size, hidden dim]
# n directions in the decoder will both always be 1, therefore:
# hidden = [n layers, batch size, hidden dim]
# context = [n layers, batch size, hidden dim]
input = input.unsqueeze(0)
# input = [1, batch size]
# embedded = self.dropout(self.embedding(input))
#embedded = [1, batch size, embedding dim]
output, (hidden, cell) = self.rnn(input)
# output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
# output = [seq length, batch size, hidden dim * n directions]
# hidden = [n layers * n directions, batch size, hidden dim]
# cell = [n layers * n directions, batch size, hidden dim]
# seq length and n directions will always be 1 in this decoder, therefore:
# output = [1, batch size, hidden dim]
# hidden = [n layers, batch size, hidden dim]
# cell = [n layers, batch size, hidden dim]
prediction = self.fc_out(output.squeeze(0))
# prediction = [batch size, output dim]
return prediction, hidden, cell
我最终的 Seq2Seq 模型:
Seq2Seq(
(encoder): Encoder(
(embedding): Embedding(172, 300)
(rnn): LSTM(300, 1024, num_layers=2, dropout=0.5)
(dropout): Dropout(p=0.5, inplace=False)
)
(decoder): Decoder(
(embedding): Embedding(1662, 300)
(rnn): LSTM(300, 1024, num_layers=2, dropout=0.5)
(fc_out): Linear(in_features=1024, out_features=1662, bias=True)
(dropout): Dropout(p=0.5, inplace=False)
)
)
当我训练模型时,出现错误“Dimension out of range(预计在 [-1, 0] 范围内,但得到 1)” 在我看来,它来自
train_fn
def train_fn(
model, data_loader, optimizer, criterion, clip, teacher_forcing_ratio, device
):
model.train()
epoch_loss = 0
for i, batch in enumerate(data_loader):
src = batch["en_ids"].to(device) # torch.Size([61, 80])
trg = batch["NPY_DATA"].to(device) # torch.Size([8312, 80])
# src = [src length, batch size]
# trg = [trg length, batch size]
optimizer.zero_grad()
output = model(src, trg, teacher_forcing_ratio) #-----------------------> Error this line
# output = [trg length, batch size, trg vocab size]
output_dim = output.shape[-1]
output = output[1:].view(-1, output_dim)
# output = [(trg length - 1) * batch size, trg vocab size]
trg = trg[1:].view(-1)
# trg = [(trg length - 1) * batch size]
loss = criterion(output, trg)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(data_loader)
训练序列到序列编码器-解码器模型的示例,其中输入是文本,可变目标序列是数字。
输出:
Model size (kParams): 7.056
[epoch 1] train loss: 5.004
[epoch 100] train loss: 2.737
...
[epoch 600] train loss: 1.106
[epoch 700] train loss: 0.690
Inference run on training set:
Sample 0: I want to eat fruit
Target: [1, 2, 4, 5, -1]
Model: [0, 3, 4, 5, -1]
Sample 1: It is a cat
Target: [-2, 12, 3, 5, 6, 7]
Model: [-1, 12, 3, 5, 6, 7, -9, 9]
Sample 2: The aileron is the control surface in the wing...
Target: [1, 6, 3, 5, -1]
Model: [1, 6, 3, 5, -1]
Sample 3: By moving the stick, you cause pressure to...
Target: [1, 9, 2, 5, -1, 4]
Model: [1, 9, 3, 4, -1, 4]
Sample 4: The elevator is the part that moves with the...
Target: [7, 8, 9, 5, -2, 5]
Model: [2, 8, 5, 6, -2, 5]
...
句子首先被标记为单词。所有单词构成数据集的词汇表,词汇列表用于将每个单词映射到一个数值。
编码器是一个序列到向量模型,它将输入序列映射到单个编码。编码器首先使用嵌入层将输入标记映射到向量。然后编码器的 LSTM 逐步遍历句子中的每个单词,直到到达结尾。最终的单元状态向量表示该句子的编码,并被馈送到解码器。
解码器将编码映射到与目标序列长度相同的数字序列。首先复制解码器的输入以匹配目标序列的长度。这意味着我们得到与目标序列长度匹配的输出序列长度。计算输出序列和目标序列之间的损失,并且优化器是步进的。该模型一次输入一个样本,因为批处理需要一定的复杂性来管理不同的序列长度。
对于推理,我们没有目标(即我们不知道输出应该多长),因此我们根据需要继续运行解码器多次,直到它输出停止标记。
我对目标和停止标记(分类)使用了回归损失;更完整的方法将实现两个输出并为每个输出使用单独的损失。该数据集只是一些用于演示目的的示例,但您还需要一个验证集。
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
df = pd.read_csv('sample_data.csv')
sentences = df['input'].values
#Convert targets from str to int
targets = [[int(s) for s in target.split(',')] for target in df['target'].values]
#
#Define the vocabulary
#
#Tokenise sentences into words
# There are commas too, but I haven't handled those
sentences_tokenised = [sentence.lower().replace('.', ' <EOS>').split() for sentence in sentences]
all_tokens = np.concatenate(sentences_tokenised)
#Get the vocab list
vocab = sorted(set(all_tokens)) + ['<EOS>']
#Define a vocab dict that maps words to their index in the dictionary
vocab_dict = {word: index for index, word in enumerate(vocab)}
#
# Create a dataset
#
#Convert text sentences to a sequence of indices
sentences_numerical = [
[vocab_dict[token] for token in tokenised] for tokenised in sentences_tokenised
]
#Define a Dataset class that returns an
# (X, y) tuple of (numerical sentence, target) for each sample.
# Shapes are X: (sample L, 1), y: (target L, 1)
class MyDataset (Dataset):
def __init__(self, sentences_numerical, targets, end_of_target=-10):
self.sentences_numerical = sentences_numerical
self.targets = targets
self.end_of_target = end_of_target
def __len__(self):
return len(self.sentences_numerical)
def __getitem__(self, index):
numerical = self.sentences_numerical[index]
target = self.targets[index] + [self.end_of_target]
return torch.tensor(numerical, dtype=torch.long), torch.tensor(target).float()
train_dataset = MyDataset(sentences_numerical, targets)
#
# Define a simple encoder-decoder model
#
#A lambda layer is useful for simple manipulations
class LambdaLayer(nn.Module):
def __init__(self, func):
super().__init__()
self.func = func
def forward(self, x):
return self.func(x)
#Embedding layer
# input shape: (L, 1)
# output shape:(L, embedding_size)
embedding_size = 16
embedding_layer = nn.Embedding(num_embeddings=len(vocab),
embedding_dim=embedding_size)
#Encoder LSTM
#input shape: (L, embedding_size)
#output shape: output, (h_n, c_n)
# output is (L, hidden_size)
# (h_n, c_n) are both (num_layers, hidden_size)
encoder_hidden_size = 16
lstm_encoder = nn.LSTM(
input_size=embedding_size, hidden_size=encoder_hidden_size, num_layers=2
)
#Build the encoder net
encoder = nn.Sequential(
#in> (L, 1)
embedding_layer,
#out> (L, embedding_size)
lstm_encoder,
#out> output, (h_n, c_n)
LambdaLayer(lambda output_hncn: output_hncn[1][1][-1, :])
#out> final layer's c_n shaped (hidden_size,)
)
#Decoder class
#input shape: hidden_size
#output shape: (target L, 1)
class Decoder (nn.Module):
def __init__(self, input_size, hidden_size, target_size):
super().__init__()
self.lstm_decoder = nn.LSTM(
input_size, hidden_size, proj_size=target_size, num_layers=2
)
def forward(self, x, target_L):
x_tiled = torch.tile(x.reshape(1, -1), dims=[target_L, 1])
#in> (target L, enc hidden_size)
output, (h_n, c_n) = self.lstm_decoder(x_tiled)
#out> (target L, self.target_size)
return output
#Create the decoder
decoder = Decoder(
input_size=encoder_hidden_size, hidden_size=encoder_hidden_size, target_size=1
)
#Define the encoder-decoder
encoder_decoder = nn.Sequential(encoder, decoder)
optimiser = torch.optim.NAdam(encoder_decoder.parameters())
print(
'Model size (kParams):',
sum([p.numel() for p in encoder_decoder.parameters() if p.requires_grad]) / 1e3
)
#
# Train
#
for epoch in range(500):
encoder_decoder.train()
shuffled_ixs = torch.randperm(len(train_dataset))
cumulative_loss = 0
for sample_ix in shuffled_ixs:
sample, target = train_dataset[sample_ix]
encoding = encoder(sample)
predicted_sequence = decoder(encoding, target_L=len(target))
loss = nn.HuberLoss()(predicted_sequence.flatten(), target)
#Step optimiser
optimiser.zero_grad()
loss.backward()
optimiser.step()
cumulative_loss += loss
if epoch == 0 or (epoch + 1) % 20 == 0:
print(
f'[epoch {epoch+1:>3d}]',
f'train loss: {cumulative_loss/len(train_dataset):>5.3f}'
)
#
# View results (training set)
#
@torch.no_grad()
def get_predicted_sequence(encoder, decoder, sample, max_L=10):
encoder.eval()
decoder.eval()
encoding = encoder(sample)
predicted_sequence = decoder(encoding, target_L=max_L).ravel()
end_index = np.argwhere(predicted_sequence.round() == train_dataset.end_of_target).ravel()
return predicted_sequence[:end_index.min()] if len(end_index) else predicted_sequence
for i, (sample, target) in enumerate(train_dataset):
print(f'Sample {i}:', sentences[i])
print(' Target:', target[:-1].to(int).tolist())
print(
' Model: ',
get_predicted_sequence(encoder, decoder, sample).ravel().round().to(int).tolist()
)
print()