如何在批量用例的结构化流中实现窗口函数?

问题描述 投票:0回答:1

首先是一些背景知识,在一些业务需求将所有管道合并到一个中心位置之后,我最近不得不将管道从标准读取和写入转移到结构化流。 我所做的操作是,从

source
容器中读取数据,对数据进行一些转换,将其保存为
staging
中的表格,然后在进行一些其他转换后将数据移动到
analytics
层。

我正在使用代码:

df = spark.read.parquet(path)
transformed_df = do_transformations(df, primary_keys, transformation_name)
saveAsTable_tostaging() // 

我更新了上面的代码:

df = {spark.readStream.format.options.schema.load //}
transformed_df = do_transformations(df, primary_keys, transformation_name)
streaming_query = { transformed_df.writeStream.trigger ///}

Pseudo code for read and write streams. Note, the readStream() and writeStream() operations
are part of the central place and I cannot change them as other teams are also using it. 
I can only update my transformation() function. However, I am allowed to update the trigger details of the writeStream, which I changed to "once":True

对于我的用例,管道每天在

4AM
运行一次。对于我的用例来说,没有实际的流作业,因为我们每天只获取一次数据,这就是为什么它仍然被视为批处理作业并运行一次并停止的原因。 更改后,几乎所有转换仍然正常运行,并且整个读/写流程也正常运行,但有 1 个转换始终失败。

转换:

df
中有一些特定的字段,我们称它们为
'additive-fields'
,对于所有
summed
来说,它必须是
keys
。首先,我需要检查
table already exists
中是否有
staging
。如果是,那么我需要读取
staging
表并对同一字段上的
sum
staging table
的并集执行
df
操作,如果不是,那么我只需要执行
sum
操作在
df
上,然后每个键的单个记录应发送到
staging
以及这些
'additive-fields'

的总和

例如,令

df
=

|Key1| Key2| Nm1 | Nm2 | AdditiveField1| AdditiveField2| Db_timestamp| source_ts|
|----|-----|-----|-----|---------------|---------------|-------------|----------|
|A   |B    | Car | Bike| 10            | -0.02         |20231201     |20231127  |
|A   |B    |Taxi |Cycle|4              |-0.06          |20231201     |20231128  |
|B   |C    |Xyz  |ABC  |1              |-20            |20231201     |20231128  |

Staging_table
=

|Key1|Key2|Nm1|Nm2|AdditiveField1|AdditiveField2|Db_timestamp|source_ts|
|--- |----|---|---|--------------|--------------|------------|---------|
| B  | C  |Old|PQR|       7      |       1      |  20231120  | 20231101|

Here, Key1, Key2 are the keys, NM* are the normal fields on which I just have to take the latest record sorted by source_ts, and the additive fields need to be summed. 



所需的输出是:

|Key1|Key2|Nm1 | Nm2 |AdditiveField1|AdditiveField2|Db_timestamp|source_ts|
|----|----|----|-----|--------------|--------------|------------|---------|
| A  | B  |Taxi|Cycle|      14      |   -0.08      | 20231201   | 20231128|
| B  | C  |Xyz | ABC |       8      |    -19       | 20231201   | 20231128|

编辑:抱歉,无法修复问题中的表结构。 df、staging_df 和输出如下所示 https://prnt.sc/l3OuqsEWgGX-

当我使用标准读写时,我可以轻松地使用以下方法实现此操作:

def implement_additive_fields(df, table_name, primary_keys: list, fields: list):

    if check_tableExists(table_name): # returns true if table exists in staging else false.
        stmt = f"select * from {table_name}"
        staging_df = spark.sql(stmt)
        df = df.union(staging_df)

    window = Window.partitionBy(primary_keys).orderBy('source_ts').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    For i in fields: #these are the additive fields in the df
        df = df.withColumn(i, F.sum(i).over(window))

    df = df.withColumn('rank', F.row_number().over(window)).filter(F.col('rank')==1).drop(F.col('rank'))

   return df

实现结构化流后,我知道我不能再在我的

window functions
上使用这些
df
,我需要使用可用于
window functions
structured streaming
,它有一些时间参数,但我不明白如何实施它们,因为我仍然以
batch
的身份运行这项工作(每天一次),然后它就关闭了。这些表没有实际的流处理作业,我想对来自
sum
表以及流式处理
staging
的记录进行
df

pyspark databricks azure-databricks spark-structured-streaming databricks-sql
1个回答
0
投票

您可以使用

foreachBatch
来调用函数并执行转换。

像这样修改你的函数:

def implement_additive_fields(df, table_name, primary_keys: list, fields: list):

    if check_tableExists(table_name):
        stmt = f"select * from {table_name}"
        staging_df = spark.sql(stmt)
        df = df.union(staging_df)

    window1 = Window.partitionBy(primary_keys).orderBy(F.desc('source_ts'))
    window2 = Window.partitionBy(primary_keys).orderBy('source_ts')

    for i in fields:
        df = df.withColumn(i, F.sum(i).over(window2))

    df = df.withColumn('rank', F.row_number().over(window1)).filter(F.col('rank')==1).drop(F.col('rank'))
    df.write.saveAsTable("tmp_staged")
    print("Done")

并这样称呼它:

primary_keys=['Key1','Key2']
fields=['AdditiveField1','AdditiveField2']
stream_df = spark.readStream.schema(schema=schema).format("csv").option("header","true").load("/csvs/")
stream_df.writeStream.option("checkpointlocation","tmpcheckpointloc").foreachBatch(lambda df,id: implement_additive_fields(df,'',primary_keys=primary_keys,fields=fields)).trigger(once=True).start().awaitTermination()

这是使用的输入数据:

钥匙1 钥匙2 Nm1 Nm2 附加字段1 附加字段2 Db_时间戳 来源_ts
A B 汽车 自行车 10 -0.02 20231201 20231127
A B 出租车 循环 4 -0.06 20231201 20231128
B C Xyz ABC 1 -2.0 2023431201 20231128

输出:

Output table

钥匙1 钥匙2 Nm1 Nm2 附加字段1 附加字段2 Db_时间戳 来源_ts
A B 出租车 循环 14 -0.08 20231201 20231128
B C Xyz ABC 8 -1 2023431201 20231128
© www.soinside.com 2019 - 2024. All rights reserved.