为什么我安装的编码器在将其保存到 Snowflake 并尝试从中推断新数据后不起作用?

问题描述 投票:0回答:1

很抱歉这里的消息很长,但我的理解有限,需要一些帮助!总的来说,我对 Snowpark 和 Snowflake 还很陌生。

本质上我想做的是一种非常标准的数据科学实践。我正在获取一些数据,执行一些特征工程(使用一种热序编码)并拟合 xgboost 模型。完成后,我将通过 PUT 方法将编码器和模型保存到 Snowflake 中。然后我创建一个 udf,以便我可以使用我创建的模型进行推理,但这就是我的过程失败的地方。

从错误中我看到似乎在我加载编码器后,它无法转换我的推理数据,并指出以下错误:

“输入类型不支持 ufunc 'isnan',并且根据转换规则 ''safe'',无法将输入安全地强制为任何支持的类型”

这个错误确实让我感到困惑,因为它似乎在已知值内看到了 NAN,但事实不应该是这样,因为它已经在一组特定的已知值上进行了拟合和测试!

我尝试过通过推理函数输出已知值,但这会产生一些非常奇怪的结果,它给我一个字符串列表,但没有逗号分隔,例如:

['A' 'B' 'C'] - 据我所知,这不是任何语言都能理解的结构。

总的来说,我的问题是,如何将编码器和模型存储在 Snowflake 中,以便在推理过程中使用它们???

我将添加我创建的代码的基本视图,以便其他人可以使用它并找到相同的错误 -

from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.connector
import json
import numpy as np
from snowflake.ml.modeling.preprocessing import OneHotEncoder,OrdinalEncoder
from snowflake.ml.modeling.xgboost import XGBRegressor
 
# this connection info is just standard stuff
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
 
mock_df = session.create_dataframe(
    [[979152,"A","XX","SUN","2023-11-24 08:30:00","2023-11-24 12:25:00",189,0.62],
     [987073,"A","ZZ","SUN","2023-12-13 16:15:00","2023-12-13 11:25:00",189,0.75],
     [951384,"C","YY","FAR_SUN","2023-12-05 09:40:00","2023-12-05 13:35:00",189,0.88],
     [952380,"B","WW","FAR_SUN","2023-11-22 19:45:00","2023-11-22 14:30:00",235,0.86],
     [963602,"B","ZZ","FAR_SUN","2023-12-29 10:30:00","2023-12-29 15:05:00",235,0.66]],
    schema=[
        "ID","AIRPORT","A_AIRPORT",
        "CATEGORY","D_DATETIME","A_DATETIME","CAPACITY","TARGET"
    ]
)
 
mock_df = mock_df.select_expr("*","TO_TIMESTAMP(D_DATETIME) AS D_DATETIME_T","TO_TIMESTAMP(A_DATETIME) AS A_DATETIME_T")
 
ohe = OneHotEncoder(handle_unknown='ignore',input_cols='CATEGORY',output_cols='ROUTE_OHE')
ohe.fit(mock_df)
mock_ohe = ohe.transform(mock_df)
print(ohe.categories_)
 
categories = {
    "AIRPORT": np.array(['A', 'B', 'C'])
}
oe = OrdinalEncoder(
    handle_unknown='use_encoded_value',unknown_value=-1,
    encoded_missing_value=-1,input_cols='AIRPORT',
    output_cols='AIRPORT_ENCODE',
    categories=categories
)
oe.fit(mock_ohe)
mock_oe = oe.transform(mock_ohe)
print(oe.categories_)
 
mock_oe = (
    mock_oe
    .withColumn('depart_hour',F.hour(mock_oe.D_DATETIME_T))
    .withColumn('depart_weekday',F.dayofweek(mock_oe.D_DATETIME_T))
    .withColumn('depart_monthday',F.dayofmonth(mock_oe.D_DATETIME_T))
    .withColumn('depart_yearday',F.dayofyear(mock_oe.D_DATETIME_T))
    .withColumn('depart_month',F.month(mock_oe.D_DATETIME_T))
    .withColumn('depart_year',F.year(mock_oe.D_DATETIME_T))
    .withColumn('arrive_hour',F.hour(mock_oe.A_DATETIME_T))
)
 
xgb = XGBRegressor(
    n_estimators = 100,
    max_depth = 3,
    input_cols=[
        "AIRPORT_ENCODE","ROUTE_OHE_FAR_SUN","ROUTE_OHE_SUN",
        "CAPACITY","DEPART_HOUR",
        "DEPART_WEEKDAY","DEPART_MONTHDAY","DEPART_YEARDAY",
        "DEPART_MONTH","DEPART_YEAR","ARRIVE_HOUR"
    ],
    label_cols="TARGET",output_cols="xgb_prediction"
)
 
xgb.fit(mock_oe)
 
 
from joblib import dump
 
def save_object(object_,filename,stagename,auto_compress=True):
 
    dump(object_, filename)
    session.file.put(filename, stagename, overwrite=True,auto_compress=auto_compress)
    return
 
# Extract model object 
xgb_model = xgb.to_xgboost()
ohe_obj = ohe.to_sklearn()
oe_obj = oe.to_sklearn()
 
save_object(xgb_model,'xgb_model.joblib','@AM_TEST_MODELS')
save_object(ohe_obj,'one_hot_encode.joblib','@AM_TEST_MODELS',auto_compress=False)
save_object(oe_obj,'ordinal_encode.joblib','@AM_TEST_MODELS',auto_compress=False)
 
session.add_import("@AM_TEST_MODELS/xgb_model.joblib.gz")
session.add_import("@AM_TEST_MODELS/one_hot_encode.joblib")
session.add_import("@AM_TEST_MODELS/ordinal_encode.joblib")
session.add_packages("pandas==1.5.3","joblib==1.2.0","xgboost==1.7.3","scikit-learn==1.2.2")

##################################
@F.udf(name='predict_target',session=session,replace=True,is_permanent=True,stage_location='@AM_TEST_UDFS')
def predict_target(data: list) -> float:
    import sys
    import pandas as pd
    from joblib import load
    import sklearn
    import xgboost as xgb
    import json
 
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    # open up the encoding files
    ohe = load(import_dir + 'one_hot_encode.joblib')
    oe = load(import_dir + 'ordinal_encode.joblib')
    model = load(import_dir + 'xgb_model.joblib.gz')
    print('loaded models')
    
    features = [
        "ID","AIRPORT","A_AIRPORT",
        "CATEGORY","D_DATETIME","A_DATETIME","CAPACITY"
    ]
 
    df = pd.DataFrame([data], columns=features)
    print('loaded dataframe')
 
    # transform data for one hot and ordinal encodings
    df_ohe = ohe.transform(df) # ERROR LINE
    df_oe = oe.transform(df_ohe) # ERROR LINE
    print('transformed via one hot and ordinal')
 
    # change date cols to datetime
    df_oe.loc[:,'D_DATETIME'] = pd.to_datetime(
        df_oe.loc[:,'D_DATETIME'],format='%Y-%m-%d %H:%M:%S',yearfirst=True
    )
    df_oe['A_DATETIME'] = pd.to_datetime(
        df_oe['A_DATETIME'],format='%Y-%m-%d %H:%M:%S',yearfirst=True
    )
    print('transformed dates')
 
    df_oe['depart_hour'] = df_oe['D_DATETIME'].dt.hour
    # snowpark function goes from 1-7 whereas pandas goes from 0-6
    df_oe['depart_weekday'] = df_oe['D_DATETIME'].dt.day_of_week + 1
    df_oe['depart_monthday'] = df_oe['D_DATETIME'].dt.day
    df_oe['depart_yearday'] = df_oe['D_DATETIME'].dt.day_of_year
    df_oe['depart_month'] = df_oe['D_DATETIME'].dt.month
    df_oe['depart_year'] = df_oe['D_DATETIME'].dt.year
    df_oe['arrive_hour'] = df_oe['A_DATETIME'].dt.hour
    print('created features')
    
    
    pm = model.predict(df_oe)[0]
    return pm
##################################

inference_df = session.create_dataframe(
    [[979152,"C","ZZ","SUN","2023-11-01 16:30:00","2023-11-01 20:25:00",189],
     [987073,"C","ZZ","SUN","2023-12-18 19:15:00","2023-12-18 22:25:00",189],
     [951384,"A","YY","FAR_SUN","2023-12-06 15:40:00","2023-12-06 17:35:00",189],
     [952380,"A","WW","FAR_SUN","2023-11-22 10:45:00","2023-11-22 14:30:00",235],
     [963602,"B","WW","FAR_SUN","2023-11-30 13:30:00","2023-12-29 15:05:00",235]],
    schema=[
        "ID","AIRPORT","A_AIRPORT",
        "CATEGORY","D_DATETIME","A_DATETIME","CAPACITY"
    ]
)
 
inference_df.select(
    "ID","AIRPORT","A_AIRPORT",
    "CATEGORY","D_DATETIME","A_DATETIME","CAPACITY",
    call_udf('predict_target',
             array_construct(
                 col("ID"),col("AIRPORT"),col("A_AIRPORT"),
                 col("CATEGORY"),col("D_DATETIME"),
                 col("A_DATETIME"),col("CAPACITY")
                )
    ).as_("PREDICTED_TARGET")
).show()

其他可能有用的注意事项是:

雪花版本 - 7.44.2

Snowpark for Python 版本 - 1.9.0

本地环境 Python 版本 - 3.11(我无法降级 - 不要问)

本地环境包:

熊猫 - 1.5.3 joblib-1.2.0 xgboost - 1.7.3 sklearn - 1.2.2 numpy - 1.25.2

如果您需要任何其他信息,请询问 - 我在这里很茫然,感谢我能得到的任何帮助!

提前致谢

python snowflake-cloud-data-platform
1个回答
0
投票

所以我的主要问题是一个非常愚蠢的问题,实际上是我如何调用编码器。我错误地尝试使用 Snowpark API 转换数据,而不是使用 scikit-learn API,因为在我的 udf 中我使用 pandas 数据框。 我的代码还有其他一些不正确的地方,所以围绕调用列名称(雪花在存储表时将列名称更改为大写)和数据帧的随机多重索引的问题,但我已经修复了这些问题,现在看起来很稳定。 我改变的另一件事是函数本身以及我如何调用它,这是基于我从 Snowflake 的某人那里得到的一些反馈。现在,该函数正在利用其矢量化功能,其中传递整个数据帧,该数据帧被处理并传回,而不是按行运行。这伴随着另一个被缓存的函数,这意味着我们在运行期间只加载模型一次。

推理函数的完整代码如下

import cachetools

@cachetools.cached(cache={})
def read_file(filename):
    import sys
    import os
    import joblib
    # Get the "path" of where files added through iport are avalible
    import_dir = sys._xoptions.get("snowflake_import_directory")
    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

session.add_import("@AM_TEST_MODELS/xgb_model.pkl.gz")
session.add_import("@AM_TEST_MODELS/one_hot_encode.pkl")
session.add_import("@AM_TEST_MODELS/ordinal_encode.pkl")
session.add_import("@AM_TEST_MODELS/target_encoding.csv.gz")
session.add_packages("pandas==1.5.3","joblib==1.2.0","xgboost==1.7.3","scikit-learn==1.2.2","cloudpickle==2.2.1","cachetools","snowflake-ml-python")

from snowflake.snowpark.types import PandasDataFrameType,PandasSeriesType,IntegerType,StringType,FloatType,PandasDataFrame,PandasSeries
import pandas as pd


@F.udf(
        name='predict_package_mix_p',session=session,replace=True,
        is_permanent=True,stage_location='@AM_TEST_UDFS',
)
def predict_package_mix_p(
    df:PandasDataFrame[int,str,str,str,str,str,int]
) -> PandasSeries[float]:
    import sys
    import pandas as pd
    from joblib import load
    import sklearn
    import xgboost as xgb
    import json
    import snowflake.ml.modeling

    def transform_simple_target_encode_manual(
            df,transform_col,transform_df
        ):
        df = df.merge(transform_df, on=transform_col)
        return df
    
    def remove_space(df):
        cols = df.columns
        space_cols = [x for x in cols if ' ' in x]
        for c in space_cols:
            new_col = c.replace(" ","_")
            df = df.rename(columns={c:new_col})
        return df

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    ohe = read_file('one_hot_encode.pkl')
    oe = read_file('ordinal_encode.pkl')
    te = pd.read_csv(import_dir + 'target_encoding.csv.gz')
    model = read_file('xgb_model.pkl.gz')
    print('loaded models')
    
    features = [
        "LS1_FLIGHT_ID","DEPARTURE_AIRPORT_CODE","ARRIVAL_AIRPORT_CODE",
        "ROUTE_CATEGORY_NAME","DEPARTURE_DATETIME_LOCAL",
        "ARRIVAL_DATETIME_LOCAL","CAPACITY"
    ]

    df.columns = features
    print('loaded dataframe')

    # transform data for one hot and ordinal encodings
    df_ohe = ohe.transform(df[['ROUTE_CATEGORY_NAME']])
    encoded_df = pd.DataFrame(df_ohe, columns=ohe.categories_)
    encoded_df.columns = encoded_df.columns.get_level_values(0)
    encoded_df = encoded_df.add_prefix('ROUTE_NAME_OHE_')
    df = pd.concat([df, encoded_df], axis=1)
    df['DEPART_CODE_ENCODE'] = oe.transform(df[['DEPARTURE_AIRPORT_CODE']])

    print('transformed via one hot and ordinal')
    # transform using pre-set target encoding
    df_te = transform_simple_target_encode_manual(df,'ARRIVAL_AIRPORT_CODE',te)
    df_final = remove_space(df_te)
    print('transformed via target encode')

    # change date cols to datetime
    df_final.loc[:,'DEPARTURE_DATETIME_LOCAL'] = pd.to_datetime(
        df_final.loc[:,'DEPARTURE_DATETIME_LOCAL'],format='%Y-%m-%d %H:%M:%S',yearfirst=True
    )
    df_final['ARRIVAL_DATETIME_LOCAL'] = pd.to_datetime(
        df_final['ARRIVAL_DATETIME_LOCAL'],format='%Y-%m-%d %H:%M:%S',yearfirst=True
    )
    print('transformed dates')

    df_final['DEPART_HOUR'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.hour
    # snowpark function goes from 1-7 whereas pandas goes from 0-6
    df_final['DEPART_WEEKDAY'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.day_of_week + 1
    df_final['DEPART_MONTHDAY'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.day
    df_final['DEPART_YEARDAY'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.day_of_year
    df_final['DEPART_MONTH'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.month
    df_final['DEPART_YEAR'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.year
    df_final['ARRIVE_HOUR'] = df_final['ARRIVAL_DATETIME_LOCAL'].dt.hour
    print('created features')

    pm = pd.Series(model.predict(df_final[
            ["DEPART_CODE_ENCODE","ROUTE_NAME_OHE_CITY","ROUTE_NAME_OHE_FAR_SUN",
            "ROUTE_NAME_OHE_SKI","ROUTE_NAME_OHE_SUN","CAPACITY",
            "ARRIVAL_AIRPORT_CODE_ENCODED","DEPART_HOUR",
            "DEPART_WEEKDAY","DEPART_MONTHDAY","DEPART_YEARDAY",
            "DEPART_MONTH","DEPART_YEAR","ARRIVE_HOUR"]
        ]))
    return pm

from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import col,array_construct,call_udf
import json
import pandas as pd

# Create Snowflake Session object
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

test_df = session.create_dataframe(
    [[979152,"LBA","ALC","SUN","2023-11-24 08:30:00","2023-11-24 12:25:00",189],
     [987073,"LBA","FAO","SUN","2023-12-13 16:15:00","2023-12-13 11:25:00",189],
     [951384,"STN","FNC","FAR SUN","2023-12-05 09:40:00","2023-12-05 13:35:00",189],
     [952380,"MAN","LPA","FAR SUN","2023-12-22 19:45:00","2023-12-22 14:30:00",235],
     [963602,"MAN","FUE","FAR SUN","2023-12-29 10:30:00","2023-12-29 15:05:00",235]],
    schema=[
        "LS1_FLIGHT_ID","DEPARTURE_AIRPORT_CODE","ARRIVAL_AIRPORT_CODE",
        "ROUTE_CATEGORY_NAME","DEPARTURE_DATETIME_LOCAL","ARRIVAL_DATETIME_LOCAL","CAPACITY"
    ]
)

test_df.withColumn(
    'PREDICTED_PACKAGE_MIX', 
    predict_package_mix_p([*test_df])).show()
© www.soinside.com 2019 - 2024. All rights reserved.