将多行结构化流式传输到熊猫udf

问题描述 投票:1回答:1

我正在编写一个结构化的流作业,该作业从eventhub接收数据。经过一些准备后,我在每一行上应用了pandas_udf函数,以使用泡菜模型进行预测来创建新列。

我遇到了一个严重的问题:有时pandas_udf的输入是一组行而不是一行(按预期)。这导致我出错:

RuntimeError: Result vector from pandas_udf was not the required length: expected 2, got 1

之所以会这样,是因为pandas_udf收到多行(在本例中为2)。

这怎么可能? .withColumn不应该在每一行上逐行执行吗?

这是我的代码:

dfInt = spark \
    .readStream \
    .load() \
    .selectExpr("cast (body as string) as json") \
    .select(from_json("json",schema).alias("data")) \
    .withColumn("k", expr("uuid()")) \
    .select("key", explode("data.features").alias("feat")) \
    .select("feat.*", "key") \
    .groupBy("k") \
    .agg(*expressions) \
    .drop("k") \
    .na.drop() \
    .withColumn("prediction", predict( (F.struct([col(x) for x in (features)]))))

pandas_udf为以下:

@pandas_udf(FloatType())
def predict(x):
  return pd.Series(pickle_model.predict_proba(x)[0][1])

实际上,问题似乎出在udf的withColumn调用之前,因为更多的行来自上一步。groupBy聚合返回单个行,因为进行分组依据的键是唯一的。

您知道是什么原因吗?

pandas apache-spark pyspark user-defined-functions
1个回答
0
投票

在这种情况下,您使用的是SCALAR pandas_udf,它将熊猫系列作为输入并返回相同大小的pandas.Series。我不知道内部的确切细节,但我的理解是,每个执行器都会将您的列(F.struct([col(x) for x in (features)]))转换为执行器当前正在处理的Dataframe分区的pandas.Series并将该函数应用于系列。一个分区由许多行组成,因此您不能假定该系列的长度仅为一。您需要确保为所有行保留所有预测的Proba。您可能可以做这样的事情(假设您确实只对保持1类概率感兴趣):

@pandas_udf(FloatType())
def predict(x):
    return pd.Series(pickle_model.predict_proba(x)[:,1])
© www.soinside.com 2019 - 2024. All rights reserved.