如何使 CNN 对 DNA 序列中模式的位置不变?

问题描述 投票:0回答:1

我正在尝试使用 CNN 在 DNA 序列中查找模式(例如“CTCATGTCA”)来进行二元分类。我用pytorch写了一个模型。当模式位于序列的中心时,模型会检测到它。但如果该模式出现在随机位置,则该模型不起作用。如何使 CNN 对模式的位置不变?

这是我的代码:

import logging

import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from sklearn import metrics
from skorch import NeuralNetClassifier
from skorch.callbacks import EpochScoring
from torch.utils.data import DataLoader, Dataset
import numpy as np

import constants

timber = logging.getLogger()
logging.basicConfig(level=logging.INFO)  # change to level=logging.DEBUG to print more logs...


# utils

def one_hot_e(dna_seq: str) -> np.ndarray:
  mydict = {'A': np.asarray([1.0, 0.0, 0.0, 0.0]), 'C': np.asarray([0.0, 1.0, 0.0, 0.0]),
            'G': np.asarray([0.0, 0.0, 1.0, 0.0]), 'T': np.asarray([0.0, 0.0, 0.0, 1.0]),
            'N': np.asarray([0.0, 0.0, 0.0, 0.0]), 'H': np.asarray([0.0, 0.0, 0.0, 0.0]),
            'a': np.asarray([1.0, 0.0, 0.0, 0.0]), 'c': np.asarray([0.0, 1.0, 0.0, 0.0]),
            'g': np.asarray([0.0, 0.0, 1.0, 0.0]), 't': np.asarray([0.0, 0.0, 0.0, 1.0]),
            'n': np.asarray([0.0, 0.0, 0.0, 0.0]), '-': np.asarray([0.0, 0.0, 0.0, 0.0])}

  size_of_a_seq: int = len(dna_seq)

  # forward = np.zeros(shape=(size_of_a_seq, 4))

  forward_list: list = [mydict[dna_seq[i]] for i in range(0, size_of_a_seq)]
  encoded = np.asarray(forward_list)
  return encoded


def one_hot_e_column(column: pd.Series) -> np.ndarray:
  tmp_list: list = [one_hot_e(seq) for seq in column]
  encoded_column = np.asarray(tmp_list)
  return encoded_column


def reverse_dna_seq(dna_seq: str) -> str:
  # m_reversed = ""
  # for i in range(0, len(dna_seq)):
  #     m_reversed = dna_seq[i] + m_reversed
  # return m_reversed
  return dna_seq[::-1]


def complement_dna_seq(dna_seq: str) -> str:
  comp_map = {"A": "T", "C": "G", "T": "A", "G": "C",
              "a": "t", "c": "g", "t": "a", "g": "c",
              "N": "N", "H": "H", "-": "-",
              "n": "n", "h": "h"
              }

  comp_dna_seq_list: list = [comp_map[nucleotide] for nucleotide in dna_seq]
  comp_dna_seq: str = "".join(comp_dna_seq_list)
  return comp_dna_seq


def reverse_complement_dna_seq(dna_seq: str) -> str:
  return reverse_dna_seq(complement_dna_seq(dna_seq))


def reverse_complement_dna_seqs(column: pd.Series) -> pd.Series:
  tmp_list: list = [reverse_complement_dna_seq(seq) for seq in column]
  rc_column = pd.Series(tmp_list)
  return rc_column


class CNN1D(nn.Module):
  def __init__(self,
               in_channel_num_of_nucleotides=4,
               kernel_size_k_mer_motif=4,
               dnn_size=256,
               num_filters=1,
               lstm_hidden_size=128,
               *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.conv1d = nn.Conv1d(in_channels=in_channel_num_of_nucleotides, out_channels=num_filters,
                            kernel_size=kernel_size_k_mer_motif, stride=2)
    self.activation = nn.ReLU()
    self.pooling = nn.MaxPool1d(kernel_size=kernel_size_k_mer_motif, stride=2)

    self.flatten = nn.Flatten()
    # linear layer

    self.dnn2 = nn.Linear(in_features=14 * num_filters, out_features=dnn_size)
    self.act2 = nn.Sigmoid()
    self.dropout2 = nn.Dropout(p=0.2)

    self.out = nn.Linear(in_features=dnn_size, out_features=1)
    self.out_act = nn.Sigmoid()

    pass

  def forward(self, x):
    timber.debug(constants.magenta + f"h0: {x}")
    h = self.conv1d(x)
    timber.debug(constants.green + f"h1: {h}")
    h = self.activation(h)
    timber.debug(constants.magenta + f"h2: {h}")
    h = self.pooling(h)
    timber.debug(constants.blue + f"h3: {h}")
    timber.debug(constants.cyan + f"h4: {h}")

    h = self.flatten(h)
    timber.debug(constants.magenta + f"h5: {h},\n shape {h.shape}, size {h.size}")
    h = self.dnn2(h)
    timber.debug(constants.green + f"h6: {h}")

    h = self.act2(h)
    timber.debug(constants.blue + f"h7: {h}")

    h = self.dropout2(h)
    timber.debug(constants.cyan + f"h8: {h}")

    h = self.out(h)
    timber.debug(constants.magenta + f"h9: {h}")

    h = self.out_act(h)
    timber.debug(constants.green + f"h10: {h}")
    # h = (h > 0.5).float()  # <---- should this go here?
    # timber.debug(constants.green + f"h11: {h}")

    return h


class CustomDataset(Dataset):
  def __init__(self, dataframe):
    self.x = dataframe["Sequence"]
    self.y = dataframe["class"]

  def __len__(self):
    return len(self.y)

  def preprocessing(self, x1, y1) -> (torch.Tensor, torch.Tensor, torch.Tensor):
    forward_col = x1

    backward_col = reverse_complement_dna_seqs(forward_col)

    forward_one_hot_e_col: np.ndarray = one_hot_e_column(forward_col)
    backward_one_hot_e_col: np.ndarray = one_hot_e_column(backward_col)

    tr_xf_tensor = torch.Tensor(forward_one_hot_e_col).permute(1, 2, 0)
    tr_xb_tensor = torch.Tensor(backward_one_hot_e_col).permute(1, 2, 0)
    # timber.debug(f"y1 {y1}")
    tr_y1 = np.array([y1])  # <--- need to put it inside brackets

    return tr_xf_tensor, tr_xb_tensor, tr_y1

  def __getitem__(self, idx):
    m_seq = self.x.iloc[idx]
    labels = self.y.iloc[idx]
    xf, xb, y = self.preprocessing(m_seq, labels)
    timber.debug(f"xf -> {xf.shape}, xb -> {xb.shape}, y -> {y}")
    return xf, xb, y


def test_dataloader():
  df = pd.read_csv("todo.csv")
  X = df["Sequence"]
  y = df["class"]

  ds = CustomDataset(df)
  loader = DataLoader(ds, shuffle=True, batch_size=16)

  train_loader = loader

  for data in train_loader:
    timber.debug(data)
    # xf, xb, y = data[0], data[1], data[2]
    # timber.debug(f"xf -> {xf.shape}, xb -> {xb.shape}, y -> {y.shape}")
  pass


def get_callbacks() -> list:
  # metric.auc ( uses trapezoidal rule) gave an error: x is neither increasing, nor decreasing. so I had to remove it
  return [
    ("tr_acc", EpochScoring(
      metrics.accuracy_score,
      lower_is_better=False,
      on_train=True,
      name="train_acc",
    )),

    ("tr_recall", EpochScoring(
      metrics.recall_score,
      lower_is_better=False,
      on_train=True,
      name="train_recall",
    )),
    ("tr_precision", EpochScoring(
      metrics.precision_score,
      lower_is_better=False,
      on_train=True,
      name="train_precision",
    )),
    ("tr_roc_auc", EpochScoring(
      metrics.roc_auc_score,
      lower_is_better=False,
      on_train=False,
      name="tr_auc"
    )),
    ("tr_f1", EpochScoring(
      metrics.f1_score,
      lower_is_better=False,
      on_train=False,
      name="tr_f1"
    )),
    # ("valid_acc1", EpochScoring(
    #   metrics.accuracy_score,
    #   lower_is_better=False,
    #   on_train=False,
    #   name="valid_acc1",
    # )),
    ("valid_recall", EpochScoring(
      metrics.recall_score,
      lower_is_better=False,
      on_train=False,
      name="valid_recall",
    )),
    ("valid_precision", EpochScoring(
      metrics.precision_score,
      lower_is_better=False,
      on_train=False,
      name="valid_precision",
    )),
    ("valid_roc_auc", EpochScoring(
      metrics.roc_auc_score,
      lower_is_better=False,
      on_train=False,
      name="valid_auc"
    )),
    ("valid_f1", EpochScoring(
      metrics.f1_score,
      lower_is_better=False,
      on_train=False,
      name="valid_f1"
    ))
  ]


def start():

  # df = pd.read_csv("data64.csv")  # use this line
  df = pd.read_csv("data64random.csv")
  X = df["Sequence"]
  y = df["class"]

  npa = np.array([y.values])

  torch_tensor = torch.tensor(npa)  # [0, 1, 1, 0, ... ... ] a simple list
  print(f"torch_tensor: {torch_tensor}")
  # need to transpose it!

  yt = torch.transpose(torch_tensor, 0, 1)

  ds = CustomDataset(df)
  loader = DataLoader(ds, shuffle=True)

  # train_loader = loader
  # test_loader = loader  # todo: load another dataset later

  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  model = CNN1D().to(device)
  m_criterion = nn.BCEWithLogitsLoss
  # optimizer = optim.Adam(model.parameters(), lr=0.001)
  m_optimizer = optim.Adam

  net = NeuralNetClassifier(
    model,
    max_epochs=200,
    criterion=m_criterion,
    optimizer=m_optimizer,
    lr=0.01,
    # decay=0.01,
    # momentum=0.9,

    device=device,
    classes=["no_mqtl", "yes_mqtl"],
    verbose=True,
    callbacks=get_callbacks()
  )

  ohe_c = one_hot_e_column(X)
  print(f"ohe_c shape {ohe_c.shape}")
  ohe_c = torch.Tensor(ohe_c)
  ohe_c = ohe_c.permute(0, 2, 1)
  ohe_c = ohe_c.to(device)
  print(f"ohe_c shape {ohe_c.shape}")

  net.fit(X=ohe_c, y=yt)
  y_proba = net.predict_proba(ohe_c)
  # timber.info(f"y_proba = {y_proba}")
  pass


if __name__ == '__main__':
  start()
  # test_dataloader()
  pass

你可以找到2个数据集

  1. dna64random.csv(模型不适用于此)
  2. dna64.csv(模型适用)

您可以使用此要点链接

快速下载所有内容
scikit-learn deep-learning pytorch conv-neural-network skorch
1个回答
0
投票

下面的模型使用

Conv1d
层,验证准确度达到 99% 到 100%。不需要太多调整就可以到达那里。

我训练了 60% 的数据,其中 30% 用于验证,因此 val 集应该很有代表性。

模型相对较小,大约有700个参数。该模型在输入端具有较宽的感受野,并且与模式长度相匹配。模型性能对此初始配置非常敏感。随后的卷积层将特征大小加倍到 8,并稍微修剪序列。最后的 maxpool 层将剩余序列长度减半,然后将其展平并通过密集层映射到标量。

我发现 NAdam 比 Adam 效果更好,并且没有进一步探索优化器。 8 及以下的批量大小效果很好。

...
[epoch  13] trn loss: 0.011 [acc: 100.000%] | val loss: 0.011 [acc: 100.000%]
[epoch  14] trn loss: 0.014 [acc: 100.000%] | val loss: 0.009 [acc:  99.667%]
[epoch  15] trn loss: 0.008 [acc: 100.000%] | val loss: 0.006 [acc: 100.000%]

enter image description here

我尝试了卷积层而不是循环单元,因为它们往往更快并且性能良好。由于性能很好,我没有研究 LSTM。您的模型看起来相当大 - 尝试将其缩小,看看是否会有所改善。


用于准备数据、定义和训练模型以及打印/绘制结果的代码。

import numpy as np
from matplotlib import pyplot as plt
import pandas as pd

import torch
from torch.utils.data import DataLoader
from torch import nn

np.random.seed(0)

def one_hot_e(dna_seq: str) -> np.ndarray:
  mydict = {'A': np.asarray([1.0, 0.0, 0.0, 0.0]), 'C': np.asarray([0.0, 1.0, 0.0, 0.0]),
            'G': np.asarray([0.0, 0.0, 1.0, 0.0]), 'T': np.asarray([0.0, 0.0, 0.0, 1.0]),
            'N': np.asarray([0.0, 0.0, 0.0, 0.0]), 'H': np.asarray([0.0, 0.0, 0.0, 0.0]),
            'a': np.asarray([1.0, 0.0, 0.0, 0.0]), 'c': np.asarray([0.0, 1.0, 0.0, 0.0]),
            'g': np.asarray([0.0, 0.0, 1.0, 0.0]), 't': np.asarray([0.0, 0.0, 0.0, 1.0]),
            'n': np.asarray([0.0, 0.0, 0.0, 0.0]), '-': np.asarray([0.0, 0.0, 0.0, 0.0])}

  size_of_a_seq: int = len(dna_seq)

  # forward = np.zeros(shape=(size_of_a_seq, 4))

  forward_list: list = [mydict[dna_seq[i]] for i in range(0, size_of_a_seq)]
  encoded = np.asarray(forward_list)
  return encoded

#
#Load and prepare data    "CTCATGTCA"
#
df = pd.read_csv('data64random.csv')

#To numpy arrays, and encode X
X = np.stack([one_hot_e(row) for row in df.Sequence], axis=0)
y = df['class'].values

#Shuffle and split
train_size = int(0.6 * len(X))
val_size = int(0.3 * len(X))

shuffle_ixs = np.random.permutation(len(X))
X, y = [arr[shuffle_ixs] for arr in [X, y]]

X_train, y_train = [arr[:train_size] for arr in [X, y]]
X_val, y_val = [arr[train_size:train_size + val_size] for arr in [X, y]]

#As tensors. Useful for passing directly to model as a single large batch.
X_train_t, y_train_t = [torch.tensor(arr).float() for arr in [X_train, y_train]]
X_val_t, y_val_t = [torch.tensor(arr).float() for arr in [X_val, y_val]]


#
#Define the model
#

#Lambda layer useful for simple manipulations
class LambdaLayer(nn.Module):
    def __init__(self, func):
        super().__init__()
        self.func = func
    
    def forward(self, x):
        return self.func(x)

#batch, 64, chan

#The model
seq_len = X[0].shape[0]    #64 characters long
n_features = X[0].shape[1] #4-dim onehot encoding

torch.manual_seed(0)
model = nn.Sequential(
    #> (batch, seq_len, channels)
    
    LambdaLayer(lambda x: x.swapdims(1, 2)),
    #> (batch, channels, seq_len)
    
    #Initial wide receptive field (and it matches length of the pattern)
    nn.Conv1d(in_channels=n_features, out_channels=4, kernel_size=9, padding='same'),
    nn.ReLU(),
    nn.BatchNorm1d(num_features=4),
    
    #Conv block 1 doubles features
    nn.Conv1d(in_channels=4, out_channels=8, kernel_size=3),
    nn.ReLU(),
    nn.BatchNorm1d(num_features=8),
    
    #Conv block 2 doubles features, then maxpool
    nn.Conv1d(in_channels=8, out_channels=8, kernel_size=3),
    nn.ReLU(),
    nn.BatchNorm1d(num_features=8),
    
    #Output layer: flatten, linear
    nn.MaxPool1d(kernel_size=2, stride=2), #batch, feat, 12
    nn.Flatten(start_dim=1), #batch, feat*12
    nn.Linear(8 * 30, 1),
)
print(
    'Model size is',
    sum([p.numel() for p in model.parameters() if p.requires_grad]),
    'trainable parameters'
)

#Train loader for batchifying train data
train_loader = DataLoader(list(zip(X_train_t, y_train_t)), shuffle=True, batch_size=3)

optimiser = torch.optim.NAdam(model.parameters())
loss_fn = nn.BCEWithLogitsLoss()

from collections import defaultdict
metrics_dict = defaultdict(list)

for epoch in range(n_epochs := 15):
    model.train()
    cum_loss = 0
    
    for X_minibatch, y_minibatch in train_loader:
        logits = model(X_minibatch).ravel()
        loss = loss_fn(logits, y_minibatch)
        
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()
        
        cum_loss += loss.item() * len(X_minibatch)
    #/end of epoch
    
    time_to_print = (epoch == 0) or ((epoch + 1) % 1) == 0
    if not time_to_print:
        continue
    
    model.eval()
    
    with torch.no_grad():
        val_logits = model(X_val_t).ravel()
        trn_logits = model(X_train_t).ravel()
    
    val_loss = loss_fn(val_logits, y_val_t).item()
    trn_loss = cum_loss / len(X_train_t)
    
    val_acc = ((nn.Sigmoid()(val_logits) > 0.5).int() == y_val_t.int()).float().mean().item()
    trn_acc = ((nn.Sigmoid()(trn_logits) > 0.5).int() == y_train_t.int()).float().mean().item()
    print(
        f'[epoch {epoch + 1:>3d}]',
        f'trn loss: {trn_loss:>5.3f} [acc: {trn_acc:>8.3%}] |',
        f'val loss: {val_loss:>5.3f} [acc: {val_acc:>8.3%}]'
    )
    
    #Record metrics
    metrics_dict['epoch'].append(epoch + 1)
    metrics_dict['trn_loss'].append(trn_loss)
    metrics_dict['val_loss'].append(val_loss)
    metrics_dict['trn_acc'].append(trn_acc)
    metrics_dict['val_acc'].append(val_acc)

#View training curves
metrics_df = pd.DataFrame(metrics_dict).set_index('epoch')
ax = metrics_df.plot(
    use_index=True, y=['trn_loss', 'val_loss'], ylabel='loss',
    figsize=(8, 4), legend=False, linewidth=3, marker='s', markersize=7
)

metrics_df.mul(100).plot(
    use_index=True, y=['trn_acc', 'val_acc'], ylabel='acc', 
    linestyle='--', marker='o', ax=ax.twinx(), legend=False
)
ax.figure.legend(ncol=2)
ax.set_title('training curves')
© www.soinside.com 2019 - 2024. All rights reserved.