使用 Sci-Kit Learn GridsearchCV 在 Keras model.fit 中进行 Kfoldvalidation_data

问题描述 投票:0回答:1

我正在与 Keras 合作,使用 Sci-Kit Learn gridsearchcv 以及 Kold 和 SciKeras 包装器。 我想通过参数validation_data将Kfold的验证文件夹传递给模型的fit方法。我尝试了一些替代方案,但我做不到。这是代码。

NN = KerasClassifier(
  model=get_NN,
  X_len = len(X_train.columns),
  loss="mse",
  optimizer="SGD",
  epochs=300,
  batch_size=4,
  shuffle=True,
  verbose=False,
  # fit__validation_data = # Here I should pass the validation data
  callbacks=[
    tf.keras.callbacks.EarlyStopping(
      monitor="val_loss", min_delta=0.0001, patience=15, restore_best_weights=True
    )
  ]
)

custom_scores_monk = {
    "accuracy": "accuracy",
    "mse": make_scorer(mean_squared_error,greater_is_better=False)
}

NN_MONK1_GRID_DICT = {
  "model__lr" : [0.5], 
  "model__alpha" : [0.8],
  "model__hidden_activation" : ["tanh"],
  "model__neurons" : [4], 
  "model__initializer" : ["glorot"], 
  "model__nesterov" : [True], 
  "model__penalty": [None], 
  "model__lambda_reg": [None],
  "model__seed" : [15]
}

grid = GridSearchCV(NN,
                    param_grid=NN_MONK1_GRID_DICT,
                    scoring=custom_scores_monk,
                    refit="mse",
                    cv=CV,
                    return_train_score=True,
                    n_jobs=-1
        )

在其他替代方案之间,我尝试编写一个自定义回调来更新数据集 on_train_begin,但这似乎是一种肮脏的做法,我并不感到惊讶它不起作用。

class ValidationCallback(Callback):
  def __init__(self, X, y, validation_split):
    super().__init__()
    self.X = X
    self.y = y
    self.validation_split = validation_split
    self.count = 0

  def on_train_begin(self, logs=None):
    print("Training " + str(self.count))
    indexes = self.validation_split[self.count]
    X_val, y_val = [self.X.iloc[i] for i in indexes], [self.y.iloc[i] for i in indexes]
    self.count = self.count+1
    self.model.fit__validation_data = (X_val, y_val)

相反,我非常惊讶对于像 KFold 交叉验证这样常见的任务没有解决方案,特别是使用 skl 框架。 特别是,这个问题导致不可能使用“val_loss”作为早期停止的监控值,除了无法绘制和比较训练和验证学习曲线之外。

你有解决办法吗?

keras scikit-learn gridsearchcv k-fold
1个回答
0
投票

我花了大约一周的时间,终于找到了方法。

简短回答:不要这样做。只需手写一个用于网格搜索的临时方法并使用它即可。

长答案:您可以定义 SciKeras 包装器的子类,以便重新定义将当前折叠传递给它的

fit
方法。 为此,您必须:

  1. 知道将使用的折叠,因此您必须在 CV 对象中设置
    random_state
    # define a split strategy using a random_state
    CV = StratifiedKFold(n_splits=5, random_state=42, shuffle=True)
    
    # get the validation folds
    val_split = [ test for (train, test) in CV.split(X_train, y_train) ]
    
    val_data = [ 
      (
        [X_train.iloc[i].tolist() for i in indexes], 
        [y_train.iloc[i].tolist() for i in indexes]
      ) for indexes in val_split 
    ]
  1. 定义一个“静态”折叠计数器
    # static fold counter
    def count():
      count.count += 1
      return count.count
    
    def reset_counter():
      count.count =-1
    
    def get_count():
      return count.count
  1. 以同样的方式,你必须定义一个注册表来存储各种
    history
    对象
    # static history register
    def histories():
      histories.histories = []
    
    def register(h):
      histories.histories.append(h)
    
    def get_histories():
      return histories.histories
    
    def clear_histories():
      histories()
  1. 定义计算历史平均值的方法。它允许提前停止验证损失
    # utilities to get the mean of K histories
    
    def add_padding(ls, n):
      ls.extend([ls[-1]] * n)
      return ls
    
    def mean_epochs(l):
      return int(mean([ len(item['loss']) for item in l ]))
    
    def mean_history(_histories):
      m = mean_epochs(_histories)+1
      for history in _histories:
        l = len(history['loss'])
        for field in _histories[0]:
          if l>= m:
            history[field] = history[field][:m]
          else:
            history[field] = add_padding(history[field], (m-l))
      return \
        { field : 
            [ 
              (sum(x)/len(_histories)) for x in zip(
                *[ history[field] for history in _histories ]
              )
            ] for field in _histories[0]
        }
  1. 扩展 SciKeras 包装类,重新定义
    fit
    方法
    # KerasClassifier Wrapper for kfold
    class KCWrapper(KerasClassifier):
    
      # you can pass the same parameters you passed to the KerasClassifier, after val_data and k
      def __init__(self, val_data, k, *args, **kwargs):
        super(KCWrapper, self).__init__(*args, **kwargs)
        self.val_data = val_data
        self.k = k
      
      def fit(self, X, y, **kwargs):
        h = super().fit(X, y, validation_data=self.val_data[count()], **kwargs)
        register(h.history_)
        # do_NN_plot(h.history_)  # plot single fold curve
        if self.kfold_finished(): # plot mean of k folds curves
          do_NN_plot(mean_history(get_histories()))
        
      def kfold_finished(self):
        return self.k == get_count()+1
  1. 实例化分类器(包装器的包装器)
    # Define grids for gridsearchcv
    kerasClassifierParams = {
      "model" : get_NN,
      "X_len" : len(X_train.columns),
      "loss" : "mse",
      "optimizer" : "SGD",
      "epochs" : 300,
      "batch_size" : 4,
      "shuffle" : True,
      "verbose" : False
    }

    NN = KCWrapper(
      val_data,
      5, # 5-Fold
      callbacks=[
        tf.keras.callbacks.EarlyStopping(
          monitor="val_loss", min_delta=0.0001, patience=20, restore_best_weights=True
        )
      ],
      **kerasClassifierParams
    )

提供的代码还使用了绘制数据的例程:

    def do_NN_plot(history):
    
      # Plot Accuracy
      plt.plot(history['binary_accuracy'])
      plt.plot(history['val_binary_accuracy'], linestyle="--", color="orange")
      plt.title(f'model accuracy')
      plt.ylabel('accuracy')
      plt.xlabel('epoch')
      plt.legend(['training', 'test'], loc='lower right')
      plt.show()
    
      # Plot loss
      plt.plot(history['loss'])
      plt.plot(history['val_loss'],  linestyle="--", color="orange")
      plt.title(f'model MSE')
      plt.ylabel('MSE')
      plt.xlabel('epoch')
      plt.legend(['training', 'test'], loc='upper right')
      plt.show()

如果您正在处理回归任务,则可以使用回归器(的包装器)的包装器执行相同的操作:

    # define a split strategy using a random_state
    CV = StratifiedKFold(n_splits=5, random_state=42, shuffle=True)
    
    # get the validation folds
    val_split = [ test for (train, test) in CV.split(X_train, y_train) ]
    
    val_data = [ 
      (
        [X_train.iloc[i].tolist() for i in indexes], 
        [y_train.iloc[i].tolist() for i in indexes]
      ) for indexes in val_split 
    ]

    # static fold counter
    def count():
      count.count += 1
      return count.count
    
    def reset_counter():
      count.count =-1
    
    def get_count():
      return count.count

    # static history register
    def histories():
      histories.histories = []
    
    def register(h):
      histories.histories.append(h)
    
    def get_histories():
      return histories.histories
    
    def clear_histories():
      histories()

    # utilities to get the mean of K histories
    
    def add_padding(ls, n):
      ls.extend([ls[-1]] * n)
      return ls
    
    def mean_epochs(l):
      return int(mean([ len(item['loss']) for item in l ]))
    
    def mean_history(_histories):
      m = mean_epochs(_histories)+1
      for history in _histories:
        l = len(history['loss'])
        for field in _histories[0]:
          if l>= m:
            history[field] = history[field][:m]
          else:
            history[field] = add_padding(history[field], (m-l))
      return \
        { field : 
            [ 
              (sum(x)/len(_histories)) for x in zip(
                *[ history[field] for history in _histories ]
              )
            ] for field in _histories[0]
        }

    def do_NN_plot(history):
    
      # Plot Accuracy
      plt.plot(history['binary_accuracy'])
      plt.plot(history['val_binary_accuracy'], linestyle="--", color="orange")
      plt.title(f'model accuracy')
      plt.ylabel('accuracy')
      plt.xlabel('epoch')
      plt.legend(['training', 'test'], loc='lower right')
      plt.show()
    
      # Plot loss
      plt.plot(history['loss'])
      plt.plot(history['val_loss'],  linestyle="--", color="orange")
      plt.title(f'model MSE')
      plt.ylabel('MSE')
      plt.xlabel('epoch')
      plt.legend(['training', 'test'], loc='upper right')
      plt.show()


    # KerasRegressor Wrapper for kfold
    class KRWrapper(KerasRegressor):
    
      def __init__(self, val_data, k, *args, **kwargs):
        super(KRWrapper, self).__init__(*args, **kwargs)
        self.val_data = val_data
        self.k = k
        
      def fit(self, X, y, **kwargs):
        h = super().fit(X, y, validation_data=self.val_data[count()], **kwargs)
        register(h.history_)
        # do_NN_plot(h.history_)  # plot single fold curve
        if self.kfold_finished(): # plot mean of k folds curves
          do_NN_plot(mean_history(get_histories()))
        
      def kfold_finished(self):
        return self.k == get_count()+1

    # Define grids for gridsearchcv
    kerasRegressorParams = {
      "model" : get_NN,
      "X_len" : len(X_train.columns),
      "loss" : mee_NN,
      "optimizer" : "SGD", # fixed into get_NN
      "batch_size" : 32,
      "epochs" : 2000,
      "shuffle" : True,
      "verbose" : 0
    }
    
    NN = KRWrapper(
      val_data,
      5,
      callbacks=[
        tf.keras.callbacks.EarlyStopping(
          monitor="val_loss", min_delta=0.000001, patience=50, restore_best_weights=True
        )
      ],
      **kerasRegressorParams
    )

这满足了我的好奇心和固执,但这是一个肮脏的解决方案(即使它仍然是一个解决方案:P)。 我一开始就说过:只需手写一个用于网格搜索的临时方法并使用它即可。上面提出的解决方案不允许使用 Skl 的 GridsearchCV 的固有并行化,因此这是很多完全无用的工作。

© www.soinside.com 2019 - 2024. All rights reserved.