我已经成功地训练和部署了一个基于 SKLearn 逻辑回归的多模型端点。它训练没有问题,并且两个端点都可以访问。但是当我发送我的文本数据时,我收到以下错误:
调用 InvokeEndpoint 操作时发生错误 (ModelError):从主服务器收到服务器错误 (500),消息为“‘numpy.ndarray’对象没有属性‘lower’”。见
这特别令人沮丧,因为在我的脚本或代码中没有任何地方我试图在我的 numpy 数组上使用 .lower()。我只能假设尽管我使用了 numpy 序列化程序,但它仍然期望 numpy 数组数据具有 .lower() 函数。令人倍感沮丧的是,我似乎无法使用 sklearn 模型可以轻松消化的少数数据格式之一。其他一切都在笔记本环境中完成。
第一个模型作为单个模型端点工作得非常好,但是一旦我将它放大以容纳更多模型,突然之间以前工作的完全相同的输入现在根本不再工作。这是我一直在使用的代码:
cell-1 训练器
def launch_training_job(name):
model_name = name.replace(' ', '')
# put the path in here
if(name == "triage"):
path_name = 'train'
else:
path_name = 'train_' + name.lower().replace(' ', '_')
# set up s3 prefixes and uri's
s3_prefix = "script-mode-workflow"
pickle_s3_prefix = f"{s3_prefix}/pickle"
pickle_s3_uri = f"s3://{bucket}/{s3_prefix}/pickle"
pickle_train_s3_uri = f"{pickle_s3_uri}/{path_name}"
# create directory
train_dir = os.path.join(os.getcwd(), "")
# upload data
s3_resource_bucket = boto3.Session().resource("s3").Bucket(bucket)
s3_resource_bucket.Object(os.path.join(pickle_s3_prefix, path_name + ".pickle")).upload_file(
train_dir + "/" + path_name + ".pickle"
)
full_output_prefix = f"{s3_prefix}/model_artifacts/{model_name}"
s3_output_path = f"s3://{bucket}/{full_output_prefix}"
# set hyper parameters (not actually used but need to exist)
hyperparameters = {
"copy_X": True,
"fit_intercept": True,
"normalize": False,
}
train_instance_type = "ml.m5.large"
# set the estimator parameters
estimator_parameters = {
"entry_point": "script2.py",
"source_dir": "script",
"output_path" : s3_output_path,
"framework_version": "1.0-1",
"py_version": "py3",
"instance_type": train_instance_type,
"instance_count": 1,
"hyperparameters": hyperparameters,
"role": role,
"base_job_name": model_name,
}
model = SKLearn(**estimator_parameters)
# I don't know if I need to set up distribution yet or if it's just a suggestion
inputs = {
"train": pickle_train_s3_uri
}
model.fit(inputs, wait=False)
# Return the estimator object
return model
cell-2 作业启动器
import shutil
import os
estimators = []
# estimator = launch_training_job(loc)
shutil.rmtree("data", ignore_errors=True)
# run two training jobs
estimator = launch_training_job('triage')
estimators.append(estimator)
estimator = launch_training_job(DataSets[0]['group'])
estimators.append(estimator)
# I only wanted to test two models here but I could introduce a loop to train even more
print()
print(
f"{len(estimators)} training jobs launched: {[x.latest_training_job.job_name for x in estimators]}"
)
2 个培训工作启动:['triage-2023-05-17-01-17-51-276', 'ActiveDirectory-2023-05-17-01-17-52-435']
cell-3
from sagemaker.multidatamodel import MultiDataModel
from time import gmtime, strftime
estimator = estimators[0]
model = estimator.create_model(role=role)
ENDPOINT_NAME = f'mme-incident-triage-{strftime("%Y-%m-%d-%H-%M-%S", gmtime())}'
MODEL_NAME = ENDPOINT_NAME
mme = MultiDataModel(
name=MODEL_NAME,
model_data_prefix=model_data_prefix,
model=model, # passing our model - passes container image needed for the endpoint
sagemaker_session=sess,
)
ENDPOINT_INSTANCE_TYPE = "ml.m5.large"
predictor = mme.deploy(
initial_instance_count=1,
instance_type=ENDPOINT_INSTANCE_TYPE,
endpoint_name=ENDPOINT_NAME
)
INFO:sagemaker:正在创建名称为:mme-incident-triage-2023-05-17-01-49-21 的模型 信息:sagemaker:正在创建名称为 mme-incident-triage-2023-05-17-01-49-21 的端点配置 信息:sagemaker:正在创建名为 mme-incident-triage-2023-05-17-01-49-21 的端点 ----!
cell-4
for est in estimators:
artifact_path = est.latest_training_job.describe()["ModelArtifacts"]["S3ModelArtifacts"]
model_name = artifact_path.split("/")[-4] + ".tar.gz"
# This is copying over the model artifact to the S3 location for the MME.
mme.add_model(model_data_source=artifact_path, model_data_path=model_name)
cell-5
type(DataSets[0]['x_test'])
pandas.core.series.Series
cell-6“问题单元格”
from sagemaker.serializers import NumpySerializer
from sagemaker.deserializers import NumpyDeserializer
predictor.serializer = NumpySerializer()
predictor.deserializer = NumpyDeserializer()
pred = predictor.predict(data=DataSets[0]['x_test'], target_model="triage.tar.gz")
---------------------------------------------------------------------------
ModelError Traceback (most recent call last)
/tmp/ipykernel_8216/2859806012.py in <cell line: 2>()
1 #pred = predictor.predict(data=DataSets[0]['x_test'], target_model="triage.tar.gz")
----> 2 pred = predictor.predict(data=df, target_model="triage.tar.gz")
~/anaconda3/envs/pytorch_p39/lib/python3.9/site-packages/sagemaker/predictor.py in predict(self, data, initial_args, target_model, target_variant, inference_id)
159 data, initial_args, target_model, target_variant, inference_id
160 )
--> 161 response = self.sagemaker_session.sagemaker_runtime_client.invoke_endpoint(**request_args)
162 return self._handle_response(response)
163
~/anaconda3/envs/pytorch_p39/lib/python3.9/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
528 )
529 # The "self" in this scope is referring to the BaseClient.
--> 530 return self._make_api_call(operation_name, kwargs)
531
532 _api_call.__name__ = str(py_operation_name)
~/anaconda3/envs/pytorch_p39/lib/python3.9/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
958 error_code = parsed_response.get("Error", {}).get("Code")
959 error_class = self.exceptions.from_code(error_code)
--> 960 raise error_class(parsed_response, operation_name)
961 else:
962 return parsed_response
ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (500) from primary with message "'numpy.ndarray' object has no attribute 'lower'". See https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/aws/sagemaker/Endpoints/mme-incident-triage-2023-05-17-01-49-21 in account 457933018994 for more information.
我的脚本应该将它的结果作为一个 numpy 数组发送,所以我假设我需要使用 numpy 反序列化器来访问数据。也许是说我正在演示我不知道序列化程序是如何工作的,如果是这样,请告诉我我需要做什么来解决这个问题。这也是我的训练脚本:
%%writefile script/script2.py
import argparse
import os
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer, TfidfTransformer
import pickle
def parse_args():
"""
Parse arguments.
"""
parser = argparse.ArgumentParser()
# hyperparameters sent by the client are passed as command-line arguments to the script
# We don't use these but I left them in as a useful template for future development
parser.add_argument("--copy_X", type=bool, default=True)
parser.add_argument("--fit_intercept", type=bool, default=True)
parser.add_argument("--normalize", type=bool, default=False)
# data directories
parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
# model directory: we will use the default set by SageMaker, /opt/ml/model
parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
return parser.parse_known_args()
def load_dataset(path):
"""
Load entire dataset.
"""
# Find all files with a pickle ext but we only load the first one in this sample:
files = [os.path.join(path, file) for file in os.listdir(path) if file.endswith("pickle")]
if len(files) == 0:
raise ValueError("Invalid # of files in dir: {}".format(path))
[X, y] = pickle.load(open(files[0], 'rb'))
return X, y
def start(args):
"""
Train a Logistic Regression
"""
print("Training mode")
try:
X_train, y_train = load_dataset(args.train)
# X_test, y_test = load_dataset(args.test)
print("Training...")
model = Pipeline([('vect', TfidfVectorizer()),
('tfidf', TfidfTransformer()),
( 'clf', LogisticRegression(max_iter=1000))
])
#model = LogisticRegression(max_iter=1000)
#model.set_params(**hyperparameters)
model.fit(X_train, y_train)
pickle.dump(model, open(os.path.join(args.model_dir, "model.pickle"), 'wb'))
except Exception as e:
# Write out an error file. This will be returned as the failureReason in the
# DescribeTrainingJob result.
trc = traceback.format_exc()
with open(os.path.join(output_path, "failure"), "w") as s:
s.write("Exception during training: " + str(e) + "\\n" + trc)
# Printing this causes the exception to be in the training job logs, as well.
print("Exception during training: " + str(e) + "\\n" + trc, file=sys.stderr)
# A non-zero exit code causes the training job to be marked as Failed.
sys.exit(255)
def model_fn(model_dir):
"""
Load the model for inference
"""
loaded_model = pickle.load(open(model_dir + "/model.pickle", 'rb'))
return loaded_model
def predict_fn(input_data, model):
"""
Apply model to the incoming request
"""
print("helloworld!")
a = model.predict(input_data)
b = model.predict_proba(input_data)
# create dictionary
d = dict(enumerate(model.classes_))
inv_d = {v: k for k, v in d.items()}
# map confidence score
i = 0
conf = a.copy()# create numpy array of equal size and dimensions as a, overwrite copied data
for x in conf:
conf[i] = b[i][inv_d[x]]
i = i + 1
c = np.concatenate([a,conf])
return c
if __name__ == "__main__":
args, _ = parse_args()
start(args)