如何在python中加速嵌套交叉验证?

问题描述 投票:6回答:4

从我发现还有另外一个这样的问题(Speed-up nested cross-validation),但是在尝试了几个修复程序后,安装MPI对我不起作用也建议在本网站和微软上,所以我希望有另一个包或回答这个问题。

我想比较多种算法和网格搜索各种参数(可能参数太多?),除了mpi4py之外还有哪些方法可以加快我的代码运行速度?据我所知,我不能使用n_jobs = -1,因为那时没有嵌套?

还要注意,我无法在下面尝试查看的许多参数上运行此操作(运行时间超过我的时间)。如果我给每个模型只有2个参数进行比较,那么2小时后只有结果。此外,我在252行和25个特征列的数据集上运行此代码,其中包含4个分类变量,以预测(“某些”,“可能”,“可能”或“未知”)基因(具有252个基因)是否会影响疾病。使用SMOTE可将样本量增加到420,然后再使用。

dataset= pd.read_csv('data.csv')
data = dataset.drop(["gene"],1)
df = data.iloc[:,0:24]
df = df.fillna(0)
X = MinMaxScaler().fit_transform(df)

le = preprocessing.LabelEncoder()
encoded_value = le.fit_transform(["certain", "likely", "possible", "unlikely"])
Y = le.fit_transform(data["category"])

sm = SMOTE(random_state=100)
X_res, y_res = sm.fit_resample(X, Y)

seed = 7
logreg = LogisticRegression(penalty='l1', solver='liblinear',multi_class='auto')
LR_par= {'penalty':['l1'], 'C': [0.5, 1, 5, 10], 'max_iter':[500, 1000, 5000]}

rfc =RandomForestClassifier()
param_grid = {'bootstrap': [True, False],
              'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
              'max_features': ['auto', 'sqrt'],
              'min_samples_leaf': [1, 2, 4,25],
              'min_samples_split': [2, 5, 10, 25],
              'n_estimators': [200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]}

mlp = MLPClassifier(random_state=seed)
parameter_space = {'hidden_layer_sizes': [(10,20), (10,20,10), (50,)],
     'activation': ['tanh', 'relu'],
     'solver': ['adam', 'sgd'],
     'max_iter': [10000],
     'alpha': [0.1, 0.01, 0.001],
     'learning_rate': ['constant','adaptive']}

gbm = GradientBoostingClassifier(min_samples_split=25, min_samples_leaf=25)
param = {"loss":["deviance"],
    "learning_rate": [0.15,0.1,0.05,0.01,0.005,0.001],
    "min_samples_split": [2, 5, 10, 25],
    "min_samples_leaf": [1, 2, 4,25],
    "max_depth":[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
    "max_features":['auto', 'sqrt'],
    "criterion": ["friedman_mse"],
    "n_estimators":[200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]
    }

svm = SVC(gamma="scale", probability=True)
tuned_parameters = {'kernel':('linear', 'rbf'), 'C':(1,0.25,0.5,0.75)}

def baseline_model(optimizer='adam', learn_rate=0.01):
    model = Sequential()
    model.add(Dense(100, input_dim=X_res.shape[1], activation='relu')) 
    model.add(Dropout(0.5))
    model.add(Dense(50, activation='relu')) #8 is the dim/ the number of hidden units (units are the kernel)
    model.add(Dense(4, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    return model

keras = KerasClassifier(build_fn=baseline_model, batch_size=32, epochs=100, verbose=0)
learn_rate = [0.001, 0.01, 0.1, 0.2, 0.3]
optimizer = ['SGD', 'RMSprop', 'Adagrad', 'Adadelta', 'Adam', 'Adamax', 'Nadam']
kerasparams = dict(optimizer=optimizer, learn_rate=learn_rate)

inner_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
outer_cv = KFold(n_splits=10, shuffle=True, random_state=seed)

models = []
models.append(('GBM', GridSearchCV(gbm, param, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('RFC', GridSearchCV(rfc, param_grid, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('LR', GridSearchCV(logreg, LR_par, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('SVM', GridSearchCV(svm, tuned_parameters, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('MLP', GridSearchCV(mlp, parameter_space, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('Keras', GridSearchCV(estimator=keras, param_grid=kerasparams, cv=inner_cv,iid=False, n_jobs=1)))


results = []
names = []
scoring = 'accuracy'
X_train, X_test, Y_train, Y_test = train_test_split(X_res, y_res, test_size=0.2, random_state=0)


for name, model in models:
    nested_cv_results = model_selection.cross_val_score(model, X_res, y_res, cv=outer_cv, scoring=scoring)
    results.append(nested_cv_results)
    names.append(name)
    msg = "Nested CV Accuracy %s: %f (+/- %f )" % (name, nested_cv_results.mean()*100, nested_cv_results.std()*100)
    print(msg)
    model.fit(X_train, Y_train)
    print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100),  '%')
    print("Best Parameters: \n{}\n".format(model.best_params_))
    print("Best CV Score: \n{}\n".format(model.best_score_))

例如,大多数数据集都是二进制文件,如下所示:

gene   Tissue    Druggable Eigenvalue CADDvalue Catalogpresence   Category
ACE      1           1         1          0           1            Certain
ABO      1           0         0          0           0            Likely
TP53     1           1         0          0           0            Possible

关于如何加快这一进程的任何指导将不胜感激。

编辑:我也尝试使用dask进行并行处理,但我不确定我做得对,并且它似乎没有运行得更快:

for name, model in models:
    with joblib.parallel_backend('dask'):
        nested_cv_results = model_selection.cross_val_score(model, X_res, y_res, cv=outer_cv, scoring=scoring)
        results.append(nested_cv_results)
        names.append(name)
        msg = "Nested CV Accuracy %s: %f (+/- %f )" % (name, nested_cv_results.mean()*100, nested_cv_results.std()*100)
        print(msg)
        model.fit(X_train, Y_train)
        print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100),  '%')
    #print("Best Estimator: \n{}\n".format(model.best_estimator_))
        print("Best Parameters: \n{}\n".format(model.best_params_))
        print("Best CV Score: \n{}\n".format(model.best_score_)) #average of all cv folds for a single combination of the parameters you specify 

编辑:还要注意减少gridsearch,我已经尝试过每个模型5个参数但是这仍然需要几个小时才能完成,所以虽然减少数量将有所帮助,如果有任何效率的建议超出我的意愿心怀感激。

python parallel-processing scikit-learn dask cross-validation
4个回答
2
投票

两件事情:

  1. 而不是GridSearch尝试使用HyperOpt - 它是一个用于串行和并行优化的Python库。
  2. 我会通过使用UMAPPCA来减少维数。 UMAP可能是更好的选择。

申请SMOTE后:

import umap

dim_reduced = umap.UMAP(
        min_dist=min_dist,
        n_neighbors=neighbours,
        random_state=1234,
    ).fit_transform(smote_output)

然后你可以使用dim_reduced进行火车测试拆分。

减少维度将有助于消除数据中的噪音,而不是处理25个功能,您将它们降低到2(使用UMAP)或您选择的组件数量(使用PCA)。哪个应该对性能产生重大影响。


3
投票

Dask-ML具有可扩展的实现GridSearchCVRandomSearchCV,我相信,它们可替代Scikit-Learn。它们是与Scikit-Learn开发人员一起开发的。

它们可以更快,原因有两个:


1
投票

在你的情况下轻松获胜,那就是....开始使用并行处理:)。如果你有一个集群,dask会帮助你(它可以在一台机器上运行,但与sklearn中的默认调度相比有所改进并不重要),但是如果你计划在一台机器上运行它(但有多个内核/线程和“足够的”内存)然后您可以并行运行嵌套的CV。唯一的技巧是sklearn不允许你在多个进程中运行外部CV循环。但是,它允许您在多个线程中运行内部循环。

目前你在外部CV循环中有n_jobs=None(这是cross_val_score中的默认值),这意味着n_jobs=1,这是你可以在嵌套CV中与sklearn一起使用的唯一选项。

但是,通过在您使用的所有n_jobs=some_reasonable_number中设置GridSearchCV,您可以实现轻松获取。 some_reasonable_number不一定是-1(但它是一个很好的起点)。有些算法要么在n_jobs=n_cores而不是n_threads(例如,xgboost)上稳定,要么已经内置多处理(例如RandomForestClassifier),如果你产生太多进程,可能会发生冲突。


1
投票

IIUC,你试图从this example文档并行化sklearn。如果是这种情况,那么这是一种可能的解决方法

why dask is not working

Any kind of constructive guidance or further knowledge on this problem

一般进口

import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn import preprocessing
from imblearn.over_sampling import SMOTE
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score, GridSearchCV, KFold, train_test_split
from sklearn.neural_network import MLPClassifier
import dask_ml.model_selection as dcv


import time

数据

  • 我定义了3个数据集来试用dask_ml的实现 第三个(数据集3)的大小#行是可调整的,可以根据您的计算能力随意增加 我只使用此数据集执行dask_ml 以下代码适用于所有3个数据集 数据集1是SO问题中的稍长版本的样本数据
#### Dataset 1 - longer version of data in the question
d = """gene Tissue Druggable Eigenvalue CADDvalue Catalogpresence Category
ACE 1 1 1 0 1 Certain
ABO 1 0 0 0 0 Likely
TP53 1 1 0 0 0 Possible"""
data = pd.DataFrame([x.split(' ') for x in d.split('\n')])
data.columns = data.loc[0,:]
data.drop(0, axis=0, inplace=True)
data = pd.concat([data]*15)

data = data.drop(["gene"],1)
df = data.iloc[:,0:5]

X = MinMaxScaler().fit_transform(df)
le = preprocessing.LabelEncoder()
encoded_value = le.fit_transform(["Certain", "Likely", "Possible"])
Y = le.fit_transform(data["Category"])

sm = SMOTE(random_state=100)
X_res, y_res = sm.fit_resample(X, Y)
#### Dataset 2 - iris dataset from example in sklearn nested cross validation docs
# Load the dataset
from sklearn.datasets import load_iris
iris = load_iris()
X_res = iris.data
y_res = iris.target
#### Dataset 3 - size (#rows, #columns) is adjustable (I used this to time code execution)
X_res = pd.DataFrame(np.random.rand(300,50), columns=['col_'+str(c+1) for c in list(range(50))])
from random import shuffle
cats = ["paris", "barcelona", "kolkata", "new york", 'sydney']
y_values = cats*int(len(X_res)/len(cats))
shuffle(y_values)
y_res = pd.Series(y_values)

实例化分类器 - 不会改变问题中的代码

seed = 7
logreg = LogisticRegression(penalty='l1', solver='liblinear',multi_class='auto')
LR_par= {'penalty':['l1'], 'C': [0.5, 1, 5, 10], 'max_iter':[500, 1000, 5000]}

mlp = MLPClassifier(random_state=seed)
parameter_space = {'hidden_layer_sizes': [(10,20), (10,20,10), (50,)],
     'activation': ['tanh', 'relu'],
     'solver': ['adam', 'sgd'],
     'max_iter': [10000],
     'alpha': [0.1, 0.01, 0.001],
     'learning_rate': ['constant','adaptive']}

rfc =RandomForestClassifier()
param_grid = {'bootstrap': [True, False],
              'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
              'max_features': ['auto', 'sqrt'],
              'min_samples_leaf': [1, 2, 4,25],
              'min_samples_split': [2, 5, 10, 25],
              'n_estimators': [200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]}

gbm = GradientBoostingClassifier(min_samples_split=25, min_samples_leaf=25)
param = {"loss":["deviance"],
    "learning_rate": [0.15,0.1,0.05,0.01,0.005,0.001],
    "min_samples_split": [2, 5, 10, 25],
    "min_samples_leaf": [1, 2, 4,25],
    "max_depth":[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
    "max_features":['auto', 'sqrt'],
    "criterion": ["friedman_mse"],
    "n_estimators":[200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]
    }

svm = SVC(gamma="scale", probability=True)
tuned_parameters = {'kernel':('linear', 'rbf'), 'C':(1,0.25,0.5,0.75)}
inner_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
outer_cv = KFold(n_splits=10, shuffle=True, random_state=seed)

使用由GridSearchCV实现的dask_ml(最初由@MRocklin here建议) - 请参阅dask_ml docs for dask_ml.model_selection.GridSearchCV

  • 为简洁起见,我排除了KerasClassifier和辅助函数baseline_model(),但我处理前者的方法与其他方法相同
models = []
models.append(('MLP', dcv.GridSearchCV(mlp, parameter_space, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('GBM', dcv.GridSearchCV(gbm, param, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('RFC', dcv.GridSearchCV(rfc, param_grid, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('LR', dcv.GridSearchCV(logreg, LR_par, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('SVM', dcv.GridSearchCV(svm, tuned_parameters, cv=inner_cv, iid=False, n_jobs=1)))

初始化一个额外的空白列表以保存非嵌套的CV结果

non_nested_results = []
nested_results = []
names = []
scoring = 'accuracy'
X_train, X_test, Y_train, Y_test = train_test_split(X_res, y_res, test_size=0.2, random_state=0)

Joblib和dask客户端设置

# Create a local cluster
from dask.distributed import Client
client = Client(processes=False, threads_per_worker=4,
        n_workers=1, memory_limit='6GB')
from sklearn.externals import joblib

根据sklearn docs example执行嵌套CV

  • 首先执行GridSearchCV
  • 第二次使用cross_val_score
  • 请注意,出于演示目的,我只使用了问题示例代码中的模型列表中的1个sklearn模型(SVC
start = time.time()
for name, model in [models[-1]]:
  # Non_nested parameter search and scoring
  with joblib.parallel_backend('dask'):
    model.fit(X_train, Y_train)
  non_nested_results.append(model.best_score_)

  # Nested CV with parameter optimization
  nested_score = cross_val_score(model, X=X_train, y=Y_train, cv=outer_cv)
  nested_results.append(nested_score.mean())

  names.append(name)
  msg = "Nested CV Accuracy %s: %f (+/- %f )" %\
        (name, np.mean(nested_results)*100, np.std(nested_results)*100)
  print(msg)
  print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100),  '%')
  print("Best Estimator: \n{}\n".format(model.best_estimator_))
  print("Best Parameters: \n{}\n".format(model.best_params_))
  print("Best CV Score: \n{}\n".format(model.best_score_))

score_difference = [a_i - b_i for a_i, b_i in zip(non_nested_results, nested_results)]
print("Average difference of {0:6f} with std. dev. of {1:6f}."
      .format(np.mean(score_difference), np.std(score_difference)))

print('Total running time of the script: {:.2f} seconds' .format(time.time()-start))

client.close()

以下是使用数据集3的输出(使用脚本执行时间)

没有dask1的输出+定时

Nested CV Accuracy SVM: 20.416667 (+/- 0.000000 )
Test set accuracy: 16.67 %
Best Estimator: 
SVC(C=0.75, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='scale', kernel='linear',
  max_iter=-1, probability=True, random_state=None, shrinking=True,
  tol=0.001, verbose=False)

Best Parameters: 
{'C': 0.75, 'kernel': 'linear'}

Best CV Score: 
0.2375

Average difference of 0.033333 with std. dev. of 0.000000.
Total running time of the script: 23.96 seconds

输出+时间与dask(使用n_workers=1threads_per_worker=4)2

Nested CV Accuracy SVM: 18.750000 (+/- 0.000000 )
Test set accuracy: 13.33 %
Best Estimator: 
SVC(C=0.5, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
  max_iter=-1, probability=True, random_state=None, shrinking=True,
  tol=0.001, verbose=False)

Best Parameters: 
{'C': 0.5, 'kernel': 'rbf'}

Best CV Score: 
0.1916666666666667

Average difference of 0.004167 with std. dev. of 0.000000.
Total running time of the script: 8.84 seconds

输出+时间与dask(使用n_workers=4threads_per_worker=4)2

Nested CV Accuracy SVM: 23.333333 (+/- 0.000000 )
Test set accuracy: 21.67 %
Best Estimator: 
SVC(C=0.25, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='scale', kernel='linear',
  max_iter=-1, probability=True, random_state=None, shrinking=True,
  tol=0.001, verbose=False)

Best Parameters: 
{'C': 0.25, 'kernel': 'linear'}

Best CV Score: 
0.25

Average difference of 0.016667 with std. dev. of 0.000000.
Total running time of the script: 7.52 seconds

输出+时间与dask(使用n_workers=1threads_per_worker=8)2

Nested CV Accuracy SVM: 20.416667 (+/- 0.000000 )
Test set accuracy: 18.33 %
Best Estimator: 
SVC(C=1, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
  max_iter=-1, probability=True, random_state=None, shrinking=True,
  tol=0.001, verbose=False)

Best Parameters: 
{'C': 1, 'kernel': 'rbf'}

Best CV Score: 
0.23333333333333334

Average difference of 0.029167 with std. dev. of 0.000000.
Total running time of the script: 7.06 seconds

1使用sklearn.model_selection.GridSearchCV()并且不使用joblib()

2使用dask_ml.model_selection.GridSearchCV()来取代sklearn.model_selection.GridSearchCV()并使用joblib()

关于此答案中的代码和输出的注释

  • 我在你的问题中注意到,与文档中的示例相比,你有sklearn.model_selection.GridSearchCV()cross_val_score倒置的顺序 不确定这是否会影响你的问题太多,但我想我会提到它
  • 我没有嵌套交叉验证的经验所以我不能评论Client(..., n_workers=n, threads_per_worker=m)n>1和/或m=4 or m=8是否可接受/不正确

关于使用dask_ml的一般评论(据我所知)

  • Case 1:如果训练数据足够小以适应单个机器上的内存,但测试数据集不适合内存,则可以使用包装器ParallelPostFit 将测试数据并行读取到集群上 使用群集上的所有工作人员对并行测试数据进行预测 IIUC,此案与您的问题无关
  • Case 2:如果你想使用joblib在一个集群上训练一个大的scikit-learn模型(但训练/测试数据适合内存) - 也就是分布式scikit-learn - 那么你可以使用一个集群来进行训练和骨架代码(根据dask_ml docs)如下所示 IIUC这个案子是 与您的问题相关 我在这个答案中使用的方法

系统详细信息(用于执行代码)

dask==1.2.0
dask-ml==0.12.0
numpy==1.16.2+mkl
pandas==0.24.0
scikit-learn==0.20.3
sklearn==0.0
OS==Windows 8 (64-bit)
Python version (import platform; print(platform.python_version()))==3.7.2
© www.soinside.com 2019 - 2024. All rights reserved.