我是 DLT 新手,正在尝试掌握它。我写了下面的代码。我有两个流表(temp1 和 temp2)。我正在从这些表中创建两个视图。然后,我使用 sql 加入这些视图来创建视图 temp3。然后我将视图 temp3 填充到 Final_table。
@dlt.view
def temp1():
return spark.readStream.table("catalog1.schema1.table1")
@dlt.view
def temp2():
return spark.readStream.table("catalog1.schema1.table2.filter(col("field1") == "test"")
@dlt.view
def temp3():
return spark.sql("select * from temp1 left join temp 2 on temp1.id = temp2.id")
@dlt.create_table(
name="final_table",
table_properties={
"quality": "silver",
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true",
},
)
def populate_final_table():
final_df = dlt.read("temp3")
return final_df
我在这里遇到的错误是“无法读取 temp3。数据集未在管道中定义。”有人可以帮我吗?
temp3
视图定义中的SQL查询引用temp 2
而不是temp2
(注意空格字符)。
此外,在
temp2
视图定义中尝试将过滤器移到外部,如下所示:
@dlt.view
def temp2():
return spark.readStream.table("catalog1.schema1.table2").filter(col("field1") == "test")
除此之外,你的代码对我来说看起来很好。