很抱歉这里的消息很长,但我的理解有限,需要一些帮助!总的来说,我对 Snowpark 和 Snowflake 还很陌生。
本质上我想做的是一种非常标准的数据科学实践。我正在获取一些数据,执行一些特征工程(使用一种热序编码)并拟合 xgboost 模型。完成后,我将通过 PUT 方法将编码器和模型保存到 Snowflake 中。然后我创建一个 udf,以便我可以使用我创建的模型进行推理,但这就是我的过程失败的地方。
从错误中我看到似乎在我加载编码器后,它无法转换我的推理数据,并指出以下错误:
“输入类型不支持 ufunc 'isnan',并且根据转换规则 ''safe'',无法将输入安全地强制为任何支持的类型”
这个错误确实让我感到困惑,因为它似乎在已知值内看到了 NAN,但事实不应该是这样,因为它已经在一组特定的已知值上进行了拟合和测试!
我尝试过通过推理函数输出已知值,但这会产生一些非常奇怪的结果,它给我一个字符串列表,但没有逗号分隔,例如:
['A' 'B' 'C'] - 据我所知,这不是任何语言都能理解的结构。
总的来说,我的问题是,如何将编码器和模型存储在 Snowflake 中,以便在推理过程中使用它们???
我将添加我创建的代码的基本视图,以便其他人可以使用它并找到相同的错误 -
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.connector
import json
import numpy as np
from snowflake.ml.modeling.preprocessing import OneHotEncoder,OrdinalEncoder
from snowflake.ml.modeling.xgboost import XGBRegressor
# this connection info is just standard stuff
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
mock_df = session.create_dataframe(
[[979152,"A","XX","SUN","2023-11-24 08:30:00","2023-11-24 12:25:00",189,0.62],
[987073,"A","ZZ","SUN","2023-12-13 16:15:00","2023-12-13 11:25:00",189,0.75],
[951384,"C","YY","FAR_SUN","2023-12-05 09:40:00","2023-12-05 13:35:00",189,0.88],
[952380,"B","WW","FAR_SUN","2023-11-22 19:45:00","2023-11-22 14:30:00",235,0.86],
[963602,"B","ZZ","FAR_SUN","2023-12-29 10:30:00","2023-12-29 15:05:00",235,0.66]],
schema=[
"ID","AIRPORT","A_AIRPORT",
"CATEGORY","D_DATETIME","A_DATETIME","CAPACITY","TARGET"
]
)
mock_df = mock_df.select_expr("*","TO_TIMESTAMP(D_DATETIME) AS D_DATETIME_T","TO_TIMESTAMP(A_DATETIME) AS A_DATETIME_T")
ohe = OneHotEncoder(handle_unknown='ignore',input_cols='CATEGORY',output_cols='ROUTE_OHE')
ohe.fit(mock_df)
mock_ohe = ohe.transform(mock_df)
print(ohe.categories_)
categories = {
"AIRPORT": np.array(['A', 'B', 'C'])
}
oe = OrdinalEncoder(
handle_unknown='use_encoded_value',unknown_value=-1,
encoded_missing_value=-1,input_cols='AIRPORT',
output_cols='AIRPORT_ENCODE',
categories=categories
)
oe.fit(mock_ohe)
mock_oe = oe.transform(mock_ohe)
print(oe.categories_)
mock_oe = (
mock_oe
.withColumn('depart_hour',F.hour(mock_oe.D_DATETIME_T))
.withColumn('depart_weekday',F.dayofweek(mock_oe.D_DATETIME_T))
.withColumn('depart_monthday',F.dayofmonth(mock_oe.D_DATETIME_T))
.withColumn('depart_yearday',F.dayofyear(mock_oe.D_DATETIME_T))
.withColumn('depart_month',F.month(mock_oe.D_DATETIME_T))
.withColumn('depart_year',F.year(mock_oe.D_DATETIME_T))
.withColumn('arrive_hour',F.hour(mock_oe.A_DATETIME_T))
)
xgb = XGBRegressor(
n_estimators = 100,
max_depth = 3,
input_cols=[
"AIRPORT_ENCODE","ROUTE_OHE_FAR_SUN","ROUTE_OHE_SUN",
"CAPACITY","DEPART_HOUR",
"DEPART_WEEKDAY","DEPART_MONTHDAY","DEPART_YEARDAY",
"DEPART_MONTH","DEPART_YEAR","ARRIVE_HOUR"
],
label_cols="TARGET",output_cols="xgb_prediction"
)
xgb.fit(mock_oe)
from joblib import dump
def save_object(object_,filename,stagename,auto_compress=True):
dump(object_, filename)
session.file.put(filename, stagename, overwrite=True,auto_compress=auto_compress)
return
# Extract model object
xgb_model = xgb.to_xgboost()
ohe_obj = ohe.to_sklearn()
oe_obj = oe.to_sklearn()
save_object(xgb_model,'xgb_model.joblib','@AM_TEST_MODELS')
save_object(ohe_obj,'one_hot_encode.joblib','@AM_TEST_MODELS',auto_compress=False)
save_object(oe_obj,'ordinal_encode.joblib','@AM_TEST_MODELS',auto_compress=False)
session.add_import("@AM_TEST_MODELS/xgb_model.joblib.gz")
session.add_import("@AM_TEST_MODELS/one_hot_encode.joblib")
session.add_import("@AM_TEST_MODELS/ordinal_encode.joblib")
session.add_packages("pandas==1.5.3","joblib==1.2.0","xgboost==1.7.3","scikit-learn==1.2.2")
##################################
@F.udf(name='predict_target',session=session,replace=True,is_permanent=True,stage_location='@AM_TEST_UDFS')
def predict_target(data: list) -> float:
import sys
import pandas as pd
from joblib import load
import sklearn
import xgboost as xgb
import json
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
# open up the encoding files
ohe = load(import_dir + 'one_hot_encode.joblib')
oe = load(import_dir + 'ordinal_encode.joblib')
model = load(import_dir + 'xgb_model.joblib.gz')
print('loaded models')
features = [
"ID","AIRPORT","A_AIRPORT",
"CATEGORY","D_DATETIME","A_DATETIME","CAPACITY"
]
df = pd.DataFrame([data], columns=features)
print('loaded dataframe')
# transform data for one hot and ordinal encodings
df_ohe = ohe.transform(df) # ERROR LINE
df_oe = oe.transform(df_ohe) # ERROR LINE
print('transformed via one hot and ordinal')
# change date cols to datetime
df_oe.loc[:,'D_DATETIME'] = pd.to_datetime(
df_oe.loc[:,'D_DATETIME'],format='%Y-%m-%d %H:%M:%S',yearfirst=True
)
df_oe['A_DATETIME'] = pd.to_datetime(
df_oe['A_DATETIME'],format='%Y-%m-%d %H:%M:%S',yearfirst=True
)
print('transformed dates')
df_oe['depart_hour'] = df_oe['D_DATETIME'].dt.hour
# snowpark function goes from 1-7 whereas pandas goes from 0-6
df_oe['depart_weekday'] = df_oe['D_DATETIME'].dt.day_of_week + 1
df_oe['depart_monthday'] = df_oe['D_DATETIME'].dt.day
df_oe['depart_yearday'] = df_oe['D_DATETIME'].dt.day_of_year
df_oe['depart_month'] = df_oe['D_DATETIME'].dt.month
df_oe['depart_year'] = df_oe['D_DATETIME'].dt.year
df_oe['arrive_hour'] = df_oe['A_DATETIME'].dt.hour
print('created features')
pm = model.predict(df_oe)[0]
return pm
##################################
inference_df = session.create_dataframe(
[[979152,"C","ZZ","SUN","2023-11-01 16:30:00","2023-11-01 20:25:00",189],
[987073,"C","ZZ","SUN","2023-12-18 19:15:00","2023-12-18 22:25:00",189],
[951384,"A","YY","FAR_SUN","2023-12-06 15:40:00","2023-12-06 17:35:00",189],
[952380,"A","WW","FAR_SUN","2023-11-22 10:45:00","2023-11-22 14:30:00",235],
[963602,"B","WW","FAR_SUN","2023-11-30 13:30:00","2023-12-29 15:05:00",235]],
schema=[
"ID","AIRPORT","A_AIRPORT",
"CATEGORY","D_DATETIME","A_DATETIME","CAPACITY"
]
)
inference_df.select(
"ID","AIRPORT","A_AIRPORT",
"CATEGORY","D_DATETIME","A_DATETIME","CAPACITY",
call_udf('predict_target',
array_construct(
col("ID"),col("AIRPORT"),col("A_AIRPORT"),
col("CATEGORY"),col("D_DATETIME"),
col("A_DATETIME"),col("CAPACITY")
)
).as_("PREDICTED_TARGET")
).show()
其他可能有用的注意事项是:
雪花版本 - 7.44.2
Snowpark for Python 版本 - 1.9.0
本地环境 Python 版本 - 3.11(我无法降级 - 不要问)
本地环境包:
熊猫 - 1.5.3 joblib-1.2.0 xgboost - 1.7.3 sklearn - 1.2.2 numpy - 1.25.2
如果您需要任何其他信息,请询问 - 我在这里很茫然,感谢我能得到的任何帮助!
提前致谢
所以我的主要问题是一个非常愚蠢的问题,实际上是我如何调用编码器。我错误地尝试使用 Snowpark API 转换数据,而不是使用 scikit-learn API,因为在我的 udf 中我使用 pandas 数据框。 我的代码还有其他一些不正确的地方,所以围绕调用列名称(雪花在存储表时将列名称更改为大写)和数据帧的随机多重索引的问题,但我已经修复了这些问题,现在看起来很稳定。 我改变的另一件事是函数本身以及我如何调用它,这是基于我从 Snowflake 的某人那里得到的一些反馈。现在,该函数正在利用其矢量化功能,其中传递整个数据帧,该数据帧被处理并传回,而不是按行运行。这伴随着另一个被缓存的函数,这意味着我们在运行期间只加载模型一次。
推理函数的完整代码如下
import cachetools
@cachetools.cached(cache={})
def read_file(filename):
import sys
import os
import joblib
# Get the "path" of where files added through iport are avalible
import_dir = sys._xoptions.get("snowflake_import_directory")
if import_dir:
with open(os.path.join(import_dir, filename), 'rb') as file:
m = joblib.load(file)
return m
session.add_import("@AM_TEST_MODELS/xgb_model.pkl.gz")
session.add_import("@AM_TEST_MODELS/one_hot_encode.pkl")
session.add_import("@AM_TEST_MODELS/ordinal_encode.pkl")
session.add_import("@AM_TEST_MODELS/target_encoding.csv.gz")
session.add_packages("pandas==1.5.3","joblib==1.2.0","xgboost==1.7.3","scikit-learn==1.2.2","cloudpickle==2.2.1","cachetools","snowflake-ml-python")
from snowflake.snowpark.types import PandasDataFrameType,PandasSeriesType,IntegerType,StringType,FloatType,PandasDataFrame,PandasSeries
import pandas as pd
@F.udf(
name='predict_package_mix_p',session=session,replace=True,
is_permanent=True,stage_location='@AM_TEST_UDFS',
)
def predict_package_mix_p(
df:PandasDataFrame[int,str,str,str,str,str,int]
) -> PandasSeries[float]:
import sys
import pandas as pd
from joblib import load
import sklearn
import xgboost as xgb
import json
import snowflake.ml.modeling
def transform_simple_target_encode_manual(
df,transform_col,transform_df
):
df = df.merge(transform_df, on=transform_col)
return df
def remove_space(df):
cols = df.columns
space_cols = [x for x in cols if ' ' in x]
for c in space_cols:
new_col = c.replace(" ","_")
df = df.rename(columns={c:new_col})
return df
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
ohe = read_file('one_hot_encode.pkl')
oe = read_file('ordinal_encode.pkl')
te = pd.read_csv(import_dir + 'target_encoding.csv.gz')
model = read_file('xgb_model.pkl.gz')
print('loaded models')
features = [
"LS1_FLIGHT_ID","DEPARTURE_AIRPORT_CODE","ARRIVAL_AIRPORT_CODE",
"ROUTE_CATEGORY_NAME","DEPARTURE_DATETIME_LOCAL",
"ARRIVAL_DATETIME_LOCAL","CAPACITY"
]
df.columns = features
print('loaded dataframe')
# transform data for one hot and ordinal encodings
df_ohe = ohe.transform(df[['ROUTE_CATEGORY_NAME']])
encoded_df = pd.DataFrame(df_ohe, columns=ohe.categories_)
encoded_df.columns = encoded_df.columns.get_level_values(0)
encoded_df = encoded_df.add_prefix('ROUTE_NAME_OHE_')
df = pd.concat([df, encoded_df], axis=1)
df['DEPART_CODE_ENCODE'] = oe.transform(df[['DEPARTURE_AIRPORT_CODE']])
print('transformed via one hot and ordinal')
# transform using pre-set target encoding
df_te = transform_simple_target_encode_manual(df,'ARRIVAL_AIRPORT_CODE',te)
df_final = remove_space(df_te)
print('transformed via target encode')
# change date cols to datetime
df_final.loc[:,'DEPARTURE_DATETIME_LOCAL'] = pd.to_datetime(
df_final.loc[:,'DEPARTURE_DATETIME_LOCAL'],format='%Y-%m-%d %H:%M:%S',yearfirst=True
)
df_final['ARRIVAL_DATETIME_LOCAL'] = pd.to_datetime(
df_final['ARRIVAL_DATETIME_LOCAL'],format='%Y-%m-%d %H:%M:%S',yearfirst=True
)
print('transformed dates')
df_final['DEPART_HOUR'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.hour
# snowpark function goes from 1-7 whereas pandas goes from 0-6
df_final['DEPART_WEEKDAY'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.day_of_week + 1
df_final['DEPART_MONTHDAY'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.day
df_final['DEPART_YEARDAY'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.day_of_year
df_final['DEPART_MONTH'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.month
df_final['DEPART_YEAR'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.year
df_final['ARRIVE_HOUR'] = df_final['ARRIVAL_DATETIME_LOCAL'].dt.hour
print('created features')
pm = pd.Series(model.predict(df_final[
["DEPART_CODE_ENCODE","ROUTE_NAME_OHE_CITY","ROUTE_NAME_OHE_FAR_SUN",
"ROUTE_NAME_OHE_SKI","ROUTE_NAME_OHE_SUN","CAPACITY",
"ARRIVAL_AIRPORT_CODE_ENCODED","DEPART_HOUR",
"DEPART_WEEKDAY","DEPART_MONTHDAY","DEPART_YEARDAY",
"DEPART_MONTH","DEPART_YEAR","ARRIVE_HOUR"]
]))
return pm
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import col,array_construct,call_udf
import json
import pandas as pd
# Create Snowflake Session object
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True
test_df = session.create_dataframe(
[[979152,"LBA","ALC","SUN","2023-11-24 08:30:00","2023-11-24 12:25:00",189],
[987073,"LBA","FAO","SUN","2023-12-13 16:15:00","2023-12-13 11:25:00",189],
[951384,"STN","FNC","FAR SUN","2023-12-05 09:40:00","2023-12-05 13:35:00",189],
[952380,"MAN","LPA","FAR SUN","2023-12-22 19:45:00","2023-12-22 14:30:00",235],
[963602,"MAN","FUE","FAR SUN","2023-12-29 10:30:00","2023-12-29 15:05:00",235]],
schema=[
"LS1_FLIGHT_ID","DEPARTURE_AIRPORT_CODE","ARRIVAL_AIRPORT_CODE",
"ROUTE_CATEGORY_NAME","DEPARTURE_DATETIME_LOCAL","ARRIVAL_DATETIME_LOCAL","CAPACITY"
]
)
test_df.withColumn(
'PREDICTED_PACKAGE_MIX',
predict_package_mix_p([*test_df])).show()