在增量实时表中执行Spark sql

问题描述 投票:0回答:1

我是 DLT 新手,正在尝试掌握它。我写了下面的代码。我有两个流表(temp1 和 temp2)。我正在从这些表中创建两个视图。然后,我使用 sql 加入这些视图来创建视图 temp3。然后我将视图 temp3 填充到 Final_table。

@dlt.view
def temp1():
    return spark.readStream.table("catalog1.schema1.table1")
    
@dlt.view
def temp2():
    return spark.readStream.table("catalog1.schema1.table2.filter(col("field1") == "test"")
    
@dlt.view
def temp3():
    return spark.sql("select * from temp1 left join temp 2 on temp1.id = temp2.id")

@dlt.create_table(
         name="final_table",
       table_properties={
        "quality": "silver",
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        },
    )
def populate_final_table():
    final_df = dlt.read("temp3")
    return final_df

我在这里遇到的错误是“无法读取 temp3。数据集未在管道中定义。”有人可以帮我吗?

pyspark databricks delta-lake delta-live-tables
1个回答
0
投票

temp3
视图定义中的SQL查询引用
temp 2
而不是
temp2
(注意空格字符)。

此外,在

temp2
视图定义中尝试将过滤器移到外部,如下所示:

@dlt.view
def temp2():
    return spark.readStream.table("catalog1.schema1.table2").filter(col("field1") == "test")

除此之外,你的代码对我来说看起来很好。

© www.soinside.com 2019 - 2024. All rights reserved.