Databricks,带有 Delta Live Tables,Spark 3.4
我有一个包含一些记录的流数据帧(我们称之为“原始”)。然后,我根据某些条件过滤此表,修改一些列值并获取新的“修改后”数据框。
我想以“修改”中的记录替换“原始”中相应记录的方式合并这两个数据框。我现在采取的方法是从原始数据框中“减去”修改后的数据框,然后将结果与“修改后的”数据框合并。
每条记录都有一个 ID 字段。
我很快意识到我想要实现的目标可以通过 pyspark 的
subtract()
函数或左反连接来完成。但是,如果右侧数据帧是流式数据帧,则这两种方法都不支持。因此,我尝试用左外连接复制左反连接:
subtracted = original.join(modified, original['ID'] == modified['ID_mod'], 'leftOuter') \
.where(modified['ID_mod'].isNull()).select(original['*'])
但是,然后我收到一条错误消息,指出流-流左外连接仅支持水印和时间范围。因此,根据 Spark 的文档,我执行了以下操作:
@dlt.table
def final_records():
# origTime and modTime are two timestamp columns
original = dlt.readStream("original_table").withWatermark('origTime', '2 hours')
modified = dlt.readStream("modified_table").withWatermark('modTime', '3 hours')
# Should give me original without modified records
subtracted = original.join(modified, expr("""
ID = ID_mod AND
modTime >= origTime AND
modTime <= origTime + interval 1 hour
"""), 'leftOuter') \
.where(modified['ID_mod'].isNull()).select(original['*'])
return subtracted.unionByName(modified, allowMissingColumns=True)
但是,当我运行 DLT 管道时,在
original
中有 4 条记录、modified
中有 0 条记录的情况下,我在 subtracted
中得到 0 条记录,而我希望结果中有 4 条记录。因此,工会的记录也为零。
这可能是什么问题?
经过进一步调查,我们确定在其中一个数据帧(在我们的例子中是右侧“修改”表)为空的情况下,连接将不会成功,因为 Spark 无法确定有效的水印。因此,它从连接返回一个空白数据帧。