我想加入两个正在流式传输的银表实时表以创建金表,但是,我遇到了多个错误,包括“RuntimeError(“查询函数必须返回 Spark 或 Koalas DataFrame”) RuntimeError:查询函数必须返回 Spark 或 Koalas DataFrame” 不确定我哪里出错了,但如果有人能解决该问题,我将不胜感激!
您可以像数据帧一样连接表并返回一个新表:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
# First Silver table definition
@dlt.table(
comment="Silver Table One"
)
def silver_table_one():
return (spark.read.format("json").load(data_path_data_one))
# Second Silver table definition
@dlt.table(
comment="Wikipedia clickstream data cleaned and prepared for analysis."
)
def silver_table_two():
return (spark.read.csv.load(data_path_data_two))
# Joining the two Silver Tables by calling them by the "function" name
@dlt.table(
comment="Joining Silver Tables"
)
def my_gold_table():
silver_one = dlt.read("silver_table_one")
silver_two = dlt.read("silver_table_two")
return (
silver_one.join(silver_two, silver_one.id == silver_two.id, how="inner")
)
参考@Axel R给出的答案。你也可以将连接写成如下:
@dlt.table(
comment="Joining Silver Tables"
)
def my_gold_table():
silver_one = dlt.read("silver_table_one")
silver_two = dlt.read("silver_table_two")
return (
silver_one.join(silver_two, ["id"], how="inner")
)
请参阅此处。
您可以使用 Spark SQL,例如
@dlt.table(
comment="Joining Silver Tables"
)
def my_gold_table():
return (
spark.sql("SELECT * from LIVE.silver_one as a INNER JOIN LIVE.silver_two as b ON b.id = a.id")
)
您需要为同一管道中定义的表添加实时虚拟模式。