结构化流如何执行pandas_udf?

问题描述 投票:0回答:1

我想了解结构化流处理如何处理新数据。

如果同时有更多行到达,则将它们附加到输入流数据帧中,对吧?

如果我有一个withColumn并应用pandas_udf,则每行调用一次该函数,或者仅调用一次,然后将这些行传递给pandas_udf?

让我们这样说:

dfInt = spark \
    .readStream \
    .load() \
    .withColumn("prediction", predict( (F.struct([col(x) for x in (features)]))))

如果更多的行同时到达,则将它们一起处理或每行处理一次?=是否有机会将其每次限制为仅一行?

apache-spark pyspark spark-structured-streaming
1个回答
0
投票

如果同时有更多行到达,则将它们附加到输入流数据帧中,对吧?

仅讨论微批量执行引擎,对吧?这就是您最有可能在流式查询中使用的内容。

结构化流使用Source.getBatch(DataSource API V1)在流查询中查询流源:

getBatch(开始:选项[偏移],结束:偏移):DataFrame

返回偏移量之间的数据(startend]。当startNone时,批处理应从第一条记录开始。

源在DataFrame中返回的都是微批处理中的数据。

如果我有一个withColumn并应用pandas_udf,则每行调用一次该函数

总是。这就是用户定义函数在Spark SQL中的工作方式。

或仅一次并将行传递到pandas_udf?

This说:

Pandas UDF是用户定义的函数,由Spark使用Arrow来传输数据,而Pandas使用该数据来执行。

Python函数应将pandas.Series作为输入并返回相同长度的pandas.Series。在内部,Spark将通过将列拆分为批次并将每个批次的函数作为数据的子集调用,然后将结果串联在一起来执行Pandas UDF。

如果同时到达更多行,则将它们一起处理或每行处理一次?

如果“到达”表示“单个DataFrame的一部分”,则“将它们一起处理”,但一次(根据UDF合同)一次一行。

是否有机会每次将其限制为仅一行?

您不必。就是这样设计的。一次仅一行。

© www.soinside.com 2019 - 2024. All rights reserved.