在自定义 MLFlow PyFunc 类中使用 PipelineModel.load() 会导致错误

问题描述 投票:0回答:2

正在创建一个自定义 PyFunc 类,以与 Databricks 要素存储一起使用,作为其模型服务 UI,而要素存储的 log_model() 方法仅适用于 PythonModel 类。

底层模型是 PipelineModel(),它在模型之前执行各种分箱和转换。

功能是这样的:

import mlflow.pyfunc
from pyspark.ml.functions import vector_to_array
from pyspark.ml import PipelineModel
    class custom_model_class(mlflow.pyfunc.PythonModel):
    
      def __init__(self, model_path, threshold):
        self.model_path = model_path
        self.threshold = threshold
        self.model = None
    
      def load_context(self, context):
        self.model = PipelineModel.load(self.model_path)
    
      def predict(self, context, model_input):
        return self.model.transform(model_input).withColumn("prediction_opt_thresh", when(vector_to_array("probability")[1] > lit(self.threshold), 1).otherwise(0))
    
    custom_model = custom_model_class(model_path = pipeline_model_directory, threshold = 0.52)

但是我收到此错误:

'RuntimeError: SparkContext should only be created and accessed on the driver.', from <command-2164064949918430>, line 12.

我尝试了各种其他方法来解决该问题,但大多数都给出了相同的问题:

  1. JVM返回None,表示没有Spark Session
  2. 创建 Spark Session 返回上述错误
  3. 使用 mlflow.pyfunc.load_model(不使用 Spark 会话)加载 PipelineModel() 对象可以工作,但不会返回概率。

我该如何解决这个问题?有没有办法让Feature Store和MLLib一起工作?

apache-spark azure-databricks apache-spark-mllib mlflow feature-store
2个回答
0
投票

我设法解决了这个问题。问题在于我们使用要素存储的 log_model() 函数的方式。请参阅此处的文档https://docs.databricks.com/dev-tools/api/python/latest/feature-store/client.html

您必须在此方法中指定参数“flavor”。如果您指定的任何内容不是 mlflow.spark(但仍在批准的风格列表中),则使用 Score_batch() 函数运行模型的环境实际上不会有结果。

fs.log_model( 模型, “模型”, 味道=mlflow.pyfunc, 训练集=训练集, 注册模型名称=“示例模型” )

为了确保您在正确类型的环境中运行,您需要一个 Spark 上下文,并且风格必须是 mlflow.spark。

希望这对使用新功能存储 API 的任何人都有帮助。


0
投票

我也有类似的问题。我正在使用 FeatureStoreClient().log_model(...,flavor = mlflow.spark,...) 记录模型,并在尝试使用模型 fs.score_batch(f"models:/{model_name} 运行推理后/Production”,batch_scoring)我收到以下错误: error

我不知道如何解决这个问题,任何帮助将不胜感激。

© www.soinside.com 2019 - 2024. All rights reserved.