Spark Databricks:流-流 LeftOuter Join 返回空结果

问题描述 投票:0回答:1

Databricks,带有 Delta Live Tables,Spark 3.4

我有一个包含一些记录的流数据帧(我们称之为“原始”)。然后,我根据某些条件过滤此表,修改一些列值并获取新的“修改后”数据框。

我想以“修改”中的记录替换“原始”中相应记录的方式合并这两个数据框。我现在采取的方法是从原始数据框中“减去”修改后的数据框,然后将结果与“修改后的”数据框合并。

每条记录都有一个 ID 字段。

我很快意识到我想要实现的目标可以通过 pyspark 的

subtract()
函数或左反连接来完成。但是,如果右侧数据帧是流式数据帧,则这两种方法都不支持。因此,我尝试用左外连接复制左反连接:

subtracted = original.join(modified, original['ID'] == modified['ID_mod'], 'leftOuter') \
                     .where(modified['ID_mod'].isNull()).select(original['*'])

但是,然后我收到一条错误消息,指出流-流左外连接仅支持水印和时间范围。因此,根据 Spark 的文档,我执行了以下操作:

@dlt.table
def final_records():
    # origTime and modTime are two timestamp columns
    original = dlt.readStream("original_table").withWatermark('origTime', '2 hours')
    modified = dlt.readStream("modified_table").withWatermark('modTime', '3 hours')
    
    # Should give me original without modified records
    subtracted = original.join(modified, expr("""
                     ID = ID_mod AND
                     modTime >= origTime AND
                     modTime <= origTime + interval 1 hour
                 """), 'leftOuter') \
                 .where(modified['ID_mod'].isNull()).select(original['*'])

    return subtracted.unionByName(modified, allowMissingColumns=True)

但是,当我运行 DLT 管道时,在

original
中有 4 条记录、
modified
中有 0 条记录的情况下,我在
subtracted
中得到 0 条记录,而我希望结果中有 4 条记录。因此,工会的记录也为零。

这可能是什么问题?

python apache-spark pyspark databricks delta-live-tables
1个回答
0
投票

经过进一步调查,我们确定在其中一个数据帧(在我们的例子中是右侧“修改”表)为空的情况下,连接将不会成功,因为 Spark 无法确定有效的水印。因此,它从连接返回一个空白数据帧。

© www.soinside.com 2019 - 2024. All rights reserved.