首先是一些背景知识,在一些业务需求将所有管道合并到一个中心位置之后,我最近不得不将管道从标准读取和写入转移到结构化流。 我所做的操作是,从
source
容器中读取数据,对数据进行一些转换,将其保存为staging
中的表格,然后在进行一些其他转换后将数据移动到analytics
层。
我正在使用代码:
df = spark.read.parquet(path)
transformed_df = do_transformations(df, primary_keys, transformation_name)
saveAsTable_tostaging() //
我更新了上面的代码:
df = {spark.readStream.format.options.schema.load //}
transformed_df = do_transformations(df, primary_keys, transformation_name)
streaming_query = { transformed_df.writeStream.trigger ///}
Pseudo code for read and write streams. Note, the readStream() and writeStream() operations
are part of the central place and I cannot change them as other teams are also using it.
I can only update my transformation() function. However, I am allowed to update the trigger details of the writeStream, which I changed to "once":True
对于我的用例,管道每天在
4AM
运行一次。对于我的用例来说,没有实际的流作业,因为我们每天只获取一次数据,这就是为什么它仍然被视为批处理作业并运行一次并停止的原因。
更改后,几乎所有转换仍然正常运行,并且整个读/写流程也正常运行,但有 1 个转换始终失败。
转换:
df
中有一些特定的字段,我们称它们为'additive-fields'
,对于所有summed
来说,它必须是keys
。首先,我需要检查 table already exists
中是否有 staging
。如果是,那么我需要读取 staging
表并对同一字段上的 sum
和 staging table
的并集执行 df
操作,如果不是,那么我只需要执行 sum
操作在 df
上,然后每个键的单个记录应发送到 staging
以及这些 'additive-fields'
的总和
例如,令
df
=
|Key1| Key2| Nm1 | Nm2 | AdditiveField1| AdditiveField2| Db_timestamp| source_ts|
|----|-----|-----|-----|---------------|---------------|-------------|----------|
|A |B | Car | Bike| 10 | -0.02 |20231201 |20231127 |
|A |B |Taxi |Cycle|4 |-0.06 |20231201 |20231128 |
|B |C |Xyz |ABC |1 |-20 |20231201 |20231128 |
Staging_table
=
|Key1|Key2|Nm1|Nm2|AdditiveField1|AdditiveField2|Db_timestamp|source_ts|
|--- |----|---|---|--------------|--------------|------------|---------|
| B | C |Old|PQR| 7 | 1 | 20231120 | 20231101|
Here, Key1, Key2 are the keys, NM* are the normal fields on which I just have to take the latest record sorted by source_ts, and the additive fields need to be summed.
|Key1|Key2|Nm1 | Nm2 |AdditiveField1|AdditiveField2|Db_timestamp|source_ts|
|----|----|----|-----|--------------|--------------|------------|---------|
| A | B |Taxi|Cycle| 14 | -0.08 | 20231201 | 20231128|
| B | C |Xyz | ABC | 8 | -19 | 20231201 | 20231128|
编辑:抱歉,无法修复问题中的表结构。 df、staging_df 和输出如下所示 https://prnt.sc/l3OuqsEWgGX-
当我使用标准读写时,我可以轻松地使用以下方法实现此操作:
def implement_additive_fields(df, table_name, primary_keys: list, fields: list):
if check_tableExists(table_name): # returns true if table exists in staging else false.
stmt = f"select * from {table_name}"
staging_df = spark.sql(stmt)
df = df.union(staging_df)
window = Window.partitionBy(primary_keys).orderBy('source_ts').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
For i in fields: #these are the additive fields in the df
df = df.withColumn(i, F.sum(i).over(window))
df = df.withColumn('rank', F.row_number().over(window)).filter(F.col('rank')==1).drop(F.col('rank'))
return df
实现结构化流后,我知道我不能再在我的
window functions
上使用这些df
,我需要使用可用于window functions
的structured streaming
,它有一些时间参数,但我不明白如何实施它们,因为我仍然以 batch
的身份运行这项工作(每天一次),然后它就关闭了。这些表没有实际的流处理作业,我想对来自 sum
表以及流式处理 staging
的记录进行 df
。
您可以使用
foreachBatch
来调用函数并执行转换。
像这样修改你的函数:
def implement_additive_fields(df, table_name, primary_keys: list, fields: list):
if check_tableExists(table_name):
stmt = f"select * from {table_name}"
staging_df = spark.sql(stmt)
df = df.union(staging_df)
window1 = Window.partitionBy(primary_keys).orderBy(F.desc('source_ts'))
window2 = Window.partitionBy(primary_keys).orderBy('source_ts')
for i in fields:
df = df.withColumn(i, F.sum(i).over(window2))
df = df.withColumn('rank', F.row_number().over(window1)).filter(F.col('rank')==1).drop(F.col('rank'))
df.write.saveAsTable("tmp_staged")
print("Done")
并这样称呼它:
primary_keys=['Key1','Key2']
fields=['AdditiveField1','AdditiveField2']
stream_df = spark.readStream.schema(schema=schema).format("csv").option("header","true").load("/csvs/")
stream_df.writeStream.option("checkpointlocation","tmpcheckpointloc").foreachBatch(lambda df,id: implement_additive_fields(df,'',primary_keys=primary_keys,fields=fields)).trigger(once=True).start().awaitTermination()
这是使用的输入数据:
钥匙1 | 钥匙2 | Nm1 | Nm2 | 附加字段1 | 附加字段2 | Db_时间戳 | 来源_ts |
---|---|---|---|---|---|---|---|
A | B | 汽车 | 自行车 | 10 | -0.02 | 20231201 | 20231127 |
A | B | 出租车 | 循环 | 4 | -0.06 | 20231201 | 20231128 |
B | C | Xyz | ABC | 1 | -2.0 | 2023431201 | 20231128 |
输出:
钥匙1 | 钥匙2 | Nm1 | Nm2 | 附加字段1 | 附加字段2 | Db_时间戳 | 来源_ts |
---|---|---|---|---|---|---|---|
A | B | 出租车 | 循环 | 14 | -0.08 | 20231201 | 20231128 |
B | C | Xyz | ABC | 8 | -1 | 2023431201 | 20231128 |