我想了解结构化流处理如何处理新数据。
如果同时有更多行到达,则将它们附加到输入流数据帧中,对吧?
如果我有一个withColumn并应用pandas_udf,则每行调用一次该函数,或者仅调用一次,然后将这些行传递给pandas_udf?
让我们这样说:
dfInt = spark \
.readStream \
.load() \
.withColumn("prediction", predict( (F.struct([col(x) for x in (features)]))))
如果更多的行同时到达,则将它们一起处理或每行处理一次?=是否有机会将其每次限制为仅一行?
如果同时有更多行到达,则将它们附加到输入流数据帧中,对吧?
仅讨论微批量执行引擎,对吧?这就是您最有可能在流式查询中使用的内容。
结构化流使用Source.getBatch(DataSource API V1)在流查询中查询流源:
getBatch(开始:选项[偏移],结束:偏移):DataFrame
返回偏移量之间的数据(
start
,end
]。当start
为None
时,批处理应从第一条记录开始。
源在DataFrame
中返回的都是微批处理中的数据。
如果我有一个withColumn并应用pandas_udf,则每行调用一次该函数
总是。这就是用户定义函数在Spark SQL中的工作方式。
或仅一次并将行传递到pandas_udf?
This说:
Pandas UDF是用户定义的函数,由Spark使用Arrow来传输数据,而Pandas使用该数据来执行。
Python函数应将
pandas.Series
作为输入并返回相同长度的pandas.Series
。在内部,Spark将通过将列拆分为批次并将每个批次的函数作为数据的子集调用,然后将结果串联在一起来执行Pandas UDF。如果同时到达更多行,则将它们一起处理或每行处理一次?
如果“到达”表示“单个DataFrame的一部分”,则“将它们一起处理”,但一次(根据UDF合同)一次一行。
是否有机会每次将其限制为仅一行?
您不必。就是这样设计的。一次仅一行。