我正在与 Keras 合作,使用 Sci-Kit Learn gridsearchcv 以及 Kold 和 SciKeras 包装器。 我想通过参数validation_data将Kfold的验证文件夹传递给模型的fit方法。我尝试了一些替代方案,但我做不到。这是代码。
NN = KerasClassifier(
model=get_NN,
X_len = len(X_train.columns),
loss="mse",
optimizer="SGD",
epochs=300,
batch_size=4,
shuffle=True,
verbose=False,
# fit__validation_data = # Here I should pass the validation data
callbacks=[
tf.keras.callbacks.EarlyStopping(
monitor="val_loss", min_delta=0.0001, patience=15, restore_best_weights=True
)
]
)
custom_scores_monk = {
"accuracy": "accuracy",
"mse": make_scorer(mean_squared_error,greater_is_better=False)
}
NN_MONK1_GRID_DICT = {
"model__lr" : [0.5],
"model__alpha" : [0.8],
"model__hidden_activation" : ["tanh"],
"model__neurons" : [4],
"model__initializer" : ["glorot"],
"model__nesterov" : [True],
"model__penalty": [None],
"model__lambda_reg": [None],
"model__seed" : [15]
}
grid = GridSearchCV(NN,
param_grid=NN_MONK1_GRID_DICT,
scoring=custom_scores_monk,
refit="mse",
cv=CV,
return_train_score=True,
n_jobs=-1
)
在其他替代方案之间,我尝试编写一个自定义回调来更新数据集 on_train_begin,但这似乎是一种肮脏的做法,我并不感到惊讶它不起作用。
class ValidationCallback(Callback):
def __init__(self, X, y, validation_split):
super().__init__()
self.X = X
self.y = y
self.validation_split = validation_split
self.count = 0
def on_train_begin(self, logs=None):
print("Training " + str(self.count))
indexes = self.validation_split[self.count]
X_val, y_val = [self.X.iloc[i] for i in indexes], [self.y.iloc[i] for i in indexes]
self.count = self.count+1
self.model.fit__validation_data = (X_val, y_val)
相反,我非常惊讶对于像 KFold 交叉验证这样常见的任务没有解决方案,特别是使用 skl 框架。 特别是,这个问题导致不可能使用“val_loss”作为早期停止的监控值,除了无法绘制和比较训练和验证学习曲线之外。
你有解决办法吗?
我花了大约一周的时间,终于找到了方法。
简短回答:不要这样做。只需手写一个用于网格搜索的临时方法并使用它即可。
长答案:您可以定义 SciKeras 包装器的子类,以便重新定义将当前折叠传递给它的
fit
方法。
为此,您必须:
random_state
# define a split strategy using a random_state
CV = StratifiedKFold(n_splits=5, random_state=42, shuffle=True)
# get the validation folds
val_split = [ test for (train, test) in CV.split(X_train, y_train) ]
val_data = [
(
[X_train.iloc[i].tolist() for i in indexes],
[y_train.iloc[i].tolist() for i in indexes]
) for indexes in val_split
]
# static fold counter
def count():
count.count += 1
return count.count
def reset_counter():
count.count =-1
def get_count():
return count.count
history
对象 # static history register
def histories():
histories.histories = []
def register(h):
histories.histories.append(h)
def get_histories():
return histories.histories
def clear_histories():
histories()
# utilities to get the mean of K histories
def add_padding(ls, n):
ls.extend([ls[-1]] * n)
return ls
def mean_epochs(l):
return int(mean([ len(item['loss']) for item in l ]))
def mean_history(_histories):
m = mean_epochs(_histories)+1
for history in _histories:
l = len(history['loss'])
for field in _histories[0]:
if l>= m:
history[field] = history[field][:m]
else:
history[field] = add_padding(history[field], (m-l))
return \
{ field :
[
(sum(x)/len(_histories)) for x in zip(
*[ history[field] for history in _histories ]
)
] for field in _histories[0]
}
fit
方法 # KerasClassifier Wrapper for kfold
class KCWrapper(KerasClassifier):
# you can pass the same parameters you passed to the KerasClassifier, after val_data and k
def __init__(self, val_data, k, *args, **kwargs):
super(KCWrapper, self).__init__(*args, **kwargs)
self.val_data = val_data
self.k = k
def fit(self, X, y, **kwargs):
h = super().fit(X, y, validation_data=self.val_data[count()], **kwargs)
register(h.history_)
# do_NN_plot(h.history_) # plot single fold curve
if self.kfold_finished(): # plot mean of k folds curves
do_NN_plot(mean_history(get_histories()))
def kfold_finished(self):
return self.k == get_count()+1
# Define grids for gridsearchcv
kerasClassifierParams = {
"model" : get_NN,
"X_len" : len(X_train.columns),
"loss" : "mse",
"optimizer" : "SGD",
"epochs" : 300,
"batch_size" : 4,
"shuffle" : True,
"verbose" : False
}
NN = KCWrapper(
val_data,
5, # 5-Fold
callbacks=[
tf.keras.callbacks.EarlyStopping(
monitor="val_loss", min_delta=0.0001, patience=20, restore_best_weights=True
)
],
**kerasClassifierParams
)
提供的代码还使用了绘制数据的例程:
def do_NN_plot(history):
# Plot Accuracy
plt.plot(history['binary_accuracy'])
plt.plot(history['val_binary_accuracy'], linestyle="--", color="orange")
plt.title(f'model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['training', 'test'], loc='lower right')
plt.show()
# Plot loss
plt.plot(history['loss'])
plt.plot(history['val_loss'], linestyle="--", color="orange")
plt.title(f'model MSE')
plt.ylabel('MSE')
plt.xlabel('epoch')
plt.legend(['training', 'test'], loc='upper right')
plt.show()
如果您正在处理回归任务,则可以使用回归器(的包装器)的包装器执行相同的操作:
# define a split strategy using a random_state
CV = StratifiedKFold(n_splits=5, random_state=42, shuffle=True)
# get the validation folds
val_split = [ test for (train, test) in CV.split(X_train, y_train) ]
val_data = [
(
[X_train.iloc[i].tolist() for i in indexes],
[y_train.iloc[i].tolist() for i in indexes]
) for indexes in val_split
]
# static fold counter
def count():
count.count += 1
return count.count
def reset_counter():
count.count =-1
def get_count():
return count.count
# static history register
def histories():
histories.histories = []
def register(h):
histories.histories.append(h)
def get_histories():
return histories.histories
def clear_histories():
histories()
# utilities to get the mean of K histories
def add_padding(ls, n):
ls.extend([ls[-1]] * n)
return ls
def mean_epochs(l):
return int(mean([ len(item['loss']) for item in l ]))
def mean_history(_histories):
m = mean_epochs(_histories)+1
for history in _histories:
l = len(history['loss'])
for field in _histories[0]:
if l>= m:
history[field] = history[field][:m]
else:
history[field] = add_padding(history[field], (m-l))
return \
{ field :
[
(sum(x)/len(_histories)) for x in zip(
*[ history[field] for history in _histories ]
)
] for field in _histories[0]
}
def do_NN_plot(history):
# Plot Accuracy
plt.plot(history['binary_accuracy'])
plt.plot(history['val_binary_accuracy'], linestyle="--", color="orange")
plt.title(f'model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['training', 'test'], loc='lower right')
plt.show()
# Plot loss
plt.plot(history['loss'])
plt.plot(history['val_loss'], linestyle="--", color="orange")
plt.title(f'model MSE')
plt.ylabel('MSE')
plt.xlabel('epoch')
plt.legend(['training', 'test'], loc='upper right')
plt.show()
# KerasRegressor Wrapper for kfold
class KRWrapper(KerasRegressor):
def __init__(self, val_data, k, *args, **kwargs):
super(KRWrapper, self).__init__(*args, **kwargs)
self.val_data = val_data
self.k = k
def fit(self, X, y, **kwargs):
h = super().fit(X, y, validation_data=self.val_data[count()], **kwargs)
register(h.history_)
# do_NN_plot(h.history_) # plot single fold curve
if self.kfold_finished(): # plot mean of k folds curves
do_NN_plot(mean_history(get_histories()))
def kfold_finished(self):
return self.k == get_count()+1
# Define grids for gridsearchcv
kerasRegressorParams = {
"model" : get_NN,
"X_len" : len(X_train.columns),
"loss" : mee_NN,
"optimizer" : "SGD", # fixed into get_NN
"batch_size" : 32,
"epochs" : 2000,
"shuffle" : True,
"verbose" : 0
}
NN = KRWrapper(
val_data,
5,
callbacks=[
tf.keras.callbacks.EarlyStopping(
monitor="val_loss", min_delta=0.000001, patience=50, restore_best_weights=True
)
],
**kerasRegressorParams
)
这满足了我的好奇心和固执,但这是一个肮脏的解决方案(即使它仍然是一个解决方案:P)。 我一开始就说过:只需手写一个用于网格搜索的临时方法并使用它即可。上面提出的解决方案不允许使用 Skl 的 GridsearchCV 的固有并行化,因此这是很多完全无用的工作。