可以用文本输入和输出(数字模式)构建序列到序列模型 rnn 吗?

问题描述 投票:0回答:1

我是使用 pytorch 进行 seq2seq 的初学者,我想创建一个以文本作为输入、输出为数字模式的模型。

例如,我的“en_ids”是已经转换为单词索引的输入,“NPY_DATA”是该输入的模式输出

train_data[0]

{'NPY_DATA': tensor([2.0000, 0.5201, 0.3295,  ..., 0.0000, 0.0000, 3.0000]),
 'en_ids': tensor([  2, 102,   0,  10,   0,   4,   0,   3]),
 'SENTENCE': 'i call it painting the wall',
 'en_tokens': ['<sos>', 'i', 'call', 'it', 'painting', 'the', 'wall', '<eos>']}

我这样定义编码器架构,这就是通用编码器架构。

class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        # src = [src length, batch size]
        embedded = self.dropout(self.embedding(src))
        # embedded = [src length, batch size, embedding dim]
        outputs, (hidden, cell) = self.rnn(embedded)
        # outputs = [src length, batch size, hidden dim * n directions]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # outputs are always from the top hidden layer
        return hidden, cell

在解码器之后,我不确定是否应该使用嵌入层?因为我的输出是数字,不像单词索引。

class Decoder(nn.Module):
    def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(output_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        # input = [batch size]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # n directions in the decoder will both always be 1, therefore:
        # hidden = [n layers, batch size, hidden dim]
        # context = [n layers, batch size, hidden dim]
        input = input.unsqueeze(0)
        # input = [1, batch size]
    #    embedded = self.dropout(self.embedding(input))
        #embedded = [1, batch size, embedding dim]
        output, (hidden, cell) = self.rnn(input)
    #    output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # output = [seq length, batch size, hidden dim * n directions]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # seq length and n directions will always be 1 in this decoder, therefore:
        # output = [1, batch size, hidden dim]
        # hidden = [n layers, batch size, hidden dim]
        # cell = [n layers, batch size, hidden dim]
        prediction = self.fc_out(output.squeeze(0))
        # prediction = [batch size, output dim]
        return prediction, hidden, cell

我最终的 Seq2Seq 模型:

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(172, 300)
    (rnn): LSTM(300, 1024, num_layers=2, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(1662, 300)
    (rnn): LSTM(300, 1024, num_layers=2, dropout=0.5)
    (fc_out): Linear(in_features=1024, out_features=1662, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

当我训练模型时,出现错误“Dimension out of range(预计在 [-1, 0] 范围内,但得到 1)” 在我看来,它来自

train_fn

def train_fn(
    model, data_loader, optimizer, criterion, clip, teacher_forcing_ratio, device
):
    model.train()
    epoch_loss = 0
    for i, batch in enumerate(data_loader):
        src = batch["en_ids"].to(device)   # torch.Size([61, 80])
        trg = batch["NPY_DATA"].to(device) # torch.Size([8312, 80])
        # src = [src length, batch size]
        # trg = [trg length, batch size]
        optimizer.zero_grad()
        output = model(src, trg, teacher_forcing_ratio) #-----------------------> Error this line
        # output = [trg length, batch size, trg vocab size]
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        # output = [(trg length - 1) * batch size, trg vocab size]
        trg = trg[1:].view(-1)
        # trg = [(trg length - 1) * batch size]
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(data_loader)
  • 问题
  1. 我可以构建 seq2seq 模型,因为输入是文本,输出是数字模式吗?
  2. 我不确定在解码器层中我应该添加嵌入层吗?因为该模型不像文本翻译。
  3. 如何解决“维度超出范围(预期在 [-1, 0] 范围内,但得到 1)”错误从何而来以及如何解决。
deep-learning pytorch sequence recurrent-neural-network
1个回答
0
投票

训练序列到序列编码器-解码器模型的示例,其中输入是文本,可变目标序列是数字。

输出:

Model size (kParams): 7.056
[epoch   1] train loss: 5.004
[epoch 100] train loss: 2.737
...
[epoch 600] train loss: 1.106
[epoch 700] train loss: 0.690

Inference run on training set:

Sample 0: I want to eat fruit
  Target: [1, 2, 4, 5, -1]
  Model:  [0, 3, 4, 5, -1]

Sample 1: It is a cat
  Target: [-2, 12, 3, 5, 6, 7]
  Model:  [-1, 12, 3, 5, 6, 7, -9, 9]

Sample 2: The aileron is the control surface in the wing...
  Target: [1, 6, 3, 5, -1]
  Model:  [1, 6, 3, 5, -1]

Sample 3: By moving the stick, you cause pressure to...
  Target: [1, 9, 2, 5, -1, 4]
  Model:  [1, 9, 3, 4, -1, 4]

Sample 4: The elevator is the part that moves with the...
  Target: [7, 8, 9, 5, -2, 5]
  Model:  [2, 8, 5, 6, -2, 5]
...

句子首先被标记为单词。所有单词构成数据集的词汇表,词汇列表用于将每个单词映射到一个数值。

编码器是一个序列到向量模型,它将输入序列映射到单个编码。编码器首先使用嵌入层将输入标记映射到向量。然后编码器的 LSTM 逐步遍历句子中的每个单词,直到到达结尾。最终的单元状态向量表示该句子的编码,并被馈送到解码器。

解码器将编码映射到与目标序列长度相同的数字序列。首先复制解码器的输入以匹配目标序列的长度。这意味着我们得到与目标序列长度匹配的输出序列长度。计算输出序列和目标序列之间的损失,并且优化器是步进的。该模型一次输入一个样本,因为批处理需要一定的复杂性来管理不同的序列长度。

对于推理,我们没有目标(即我们不知道输出应该多长),因此我们根据需要继续运行解码器多次,直到它输出停止标记。

我对目标和停止标记(分类)使用了回归损失;更完整的方法将实现两个输出并为每个输出使用单独的损失。该数据集只是一些用于演示目的的示例,但您还需要一个验证集。


import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

df = pd.read_csv('sample_data.csv')

sentences = df['input'].values

#Convert targets from str to int
targets = [[int(s) for s in target.split(',')] for target in df['target'].values]


#
#Define the vocabulary
#

#Tokenise sentences into words
# There are commas too, but I haven't handled those
sentences_tokenised = [sentence.lower().replace('.', ' <EOS>').split() for sentence in sentences]
all_tokens = np.concatenate(sentences_tokenised)

#Get the vocab list
vocab = sorted(set(all_tokens)) + ['<EOS>']

#Define a vocab dict that maps words to their index in the dictionary
vocab_dict = {word: index for index, word in enumerate(vocab)}

#
# Create a dataset
#

#Convert text sentences to a sequence of indices
sentences_numerical = [
    [vocab_dict[token] for token in tokenised] for tokenised in sentences_tokenised
]

#Define a Dataset class that returns an
# (X, y) tuple of (numerical sentence, target) for each sample.
# Shapes are X: (sample L, 1), y: (target L, 1)
class MyDataset (Dataset):
    def __init__(self, sentences_numerical, targets, end_of_target=-10):
        self.sentences_numerical = sentences_numerical
        self.targets = targets
        self.end_of_target = end_of_target
        
    def __len__(self):
        return len(self.sentences_numerical)
    
    def __getitem__(self, index):
        numerical = self.sentences_numerical[index]
        target = self.targets[index] + [self.end_of_target]
        
        return torch.tensor(numerical, dtype=torch.long), torch.tensor(target).float()

train_dataset = MyDataset(sentences_numerical, targets)

#
# Define a simple encoder-decoder model
#

#A lambda layer is useful for simple manipulations
class LambdaLayer(nn.Module):
    def __init__(self, func):
        super().__init__()
        self.func = func
    
    def forward(self, x):
        return self.func(x)
    
#Embedding layer
# input shape: (L, 1)
# output shape:(L, embedding_size) 
embedding_size = 16
embedding_layer = nn.Embedding(num_embeddings=len(vocab),
                               embedding_dim=embedding_size)

#Encoder LSTM
#input shape: (L, embedding_size)
#output shape: output, (h_n, c_n)
# output is (L, hidden_size)
# (h_n, c_n) are both (num_layers, hidden_size)
encoder_hidden_size = 16
lstm_encoder = nn.LSTM(
    input_size=embedding_size, hidden_size=encoder_hidden_size, num_layers=2
)

#Build the encoder net
encoder = nn.Sequential(
    #in> (L, 1)
    
    embedding_layer,
    #out> (L, embedding_size)
    
    lstm_encoder,
    #out> output, (h_n, c_n)
    
    LambdaLayer(lambda output_hncn: output_hncn[1][1][-1, :])
    #out> final layer's c_n shaped (hidden_size,) 
)

#Decoder class
#input shape: hidden_size
#output shape: (target L, 1)
class Decoder (nn.Module):
    def __init__(self, input_size, hidden_size, target_size):
        super().__init__()
        
        self.lstm_decoder = nn.LSTM(
            input_size, hidden_size, proj_size=target_size, num_layers=2
        )
    
    def forward(self, x, target_L):
        x_tiled = torch.tile(x.reshape(1, -1), dims=[target_L, 1])
        
        #in> (target L, enc hidden_size)
        output, (h_n, c_n) = self.lstm_decoder(x_tiled)
        #out> (target L, self.target_size)
        
        return output

#Create the decoder
decoder = Decoder(
    input_size=encoder_hidden_size, hidden_size=encoder_hidden_size, target_size=1
)

#Define the encoder-decoder
encoder_decoder = nn.Sequential(encoder, decoder)
optimiser = torch.optim.NAdam(encoder_decoder.parameters())

print(
    'Model size (kParams):',
    sum([p.numel() for p in encoder_decoder.parameters() if p.requires_grad]) / 1e3
)


#
# Train
#
for epoch in range(500):
    encoder_decoder.train()
    
    shuffled_ixs = torch.randperm(len(train_dataset))
    cumulative_loss = 0
    
    for sample_ix in shuffled_ixs:
        sample, target = train_dataset[sample_ix]
        
        encoding = encoder(sample)        
        predicted_sequence = decoder(encoding, target_L=len(target))
        loss = nn.HuberLoss()(predicted_sequence.flatten(), target)
        
        #Step optimiser
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()
        
        cumulative_loss += loss
    
    if epoch == 0 or (epoch + 1) % 20 == 0:
        print(
            f'[epoch {epoch+1:>3d}]',
            f'train loss: {cumulative_loss/len(train_dataset):>5.3f}'
        )

#
# View results (training set)
#
@torch.no_grad()
def get_predicted_sequence(encoder, decoder, sample, max_L=10):
    encoder.eval()
    decoder.eval()
    
    encoding = encoder(sample)
    predicted_sequence = decoder(encoding, target_L=max_L).ravel()
    
    end_index = np.argwhere(predicted_sequence.round() == train_dataset.end_of_target).ravel()
    return predicted_sequence[:end_index.min()] if len(end_index) else predicted_sequence
    
for i, (sample, target) in enumerate(train_dataset):
    print(f'Sample {i}:', sentences[i])
    print('  Target:', target[:-1].to(int).tolist())
    print(
        '  Model: ',
        get_predicted_sequence(encoder, decoder, sample).ravel().round().to(int).tolist()
    )
    print()
© www.soinside.com 2019 - 2024. All rights reserved.