带有增量表的 Spark 流 - 更新微批次中的输入表

问题描述 投票:0回答:1

使用两个增量表(tableA,tableB)作为流管道的输入,我想实现以下目标:

  1. 当表 A 中出现新行时开始处理(而不是当表 B 更新时)
  2. mergedTable = tableA.join(tableB, ...., "内部")
  3. 对 mergedTable 进行一些转换
  4. 根据转换将新行追加到 tableB

我从以下开始:

tableA = spark.readstream.format("delta").load(path_to_tableA)
tableB = spark.readstream.format("delta").load(path_to_tableB)
mergedTable = tableA.join(tableB, ...., "inner")

def process_microbatch(df, batch_id):
    ...transformations on df...
    df.write.mode("append").saveAsTable(path_to_tableB)

mergedTable.writeStream.foreachBatch(process_microbatch).start()

我如何确保只有 tableA 的更新才会触发微批量处理?当然也很重要的是,tableB 的新行在下一批的第 2 点中被识别。

pyspark spark-streaming azure-databricks
1个回答
0
投票

如果

tableB
仅在流开头加载一次且此后未更新,则微批次内对其所做的任何更改都不会反映在后续微批次中。为了解决这个问题,您需要确保在每个微批次中重新加载
tableB
,以便它包含上一个微批次中所做的更新。

因此,在

tableB
函数中重新加载
process_microbatch

这是代码。

from pyspark.sql.functions import expr

# Read tableA and apply watermarking
tableA = spark.readStream.format("delta").load(path_to_tableA)

# Define the processing function
def process_microbatch(df, batch_id):

    # Read tableB within the processing function
    tableB = spark.read.format("delta").load(path_to_tableB)
    # Join tables
    mergedTable = df.join(tableB, ...., "inner")
    
    transformed_df = ...
    transformed_df.write.mode("append").format("delta").save(path_to_tableB)


streamingQuery = tableA.writeStream.foreachBatch(process_microbatch)
    .start()
streamingQuery.awaitTermination()

通过在每个微批次中重新加载

tableB
,您可以确保在之前的微批次中对其所做的任何更改都会在后续微批次中得到考虑。

© www.soinside.com 2019 - 2024. All rights reserved.