有没有办法使用Python连接Delta Live Tables上的两个Live Table?

问题描述 投票:0回答:3

我想加入两个正在流式传输的银表实时表以创建金表,但是,我遇到了多个错误,包括“RuntimeError(“查询函数必须返回 Spark 或 Koalas DataFrame”) RuntimeError:查询函数必须返回 Spark 或 Koalas DataFrame” 不确定我哪里出错了,但如果有人能解决该问题,我将不胜感激!

databricks delta-live-tables
3个回答
2
投票

您可以像数据帧一样连接表并返回一个新表:

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *


# First Silver table definition
@dlt.table(
  comment="Silver Table One"
)
def silver_table_one():
  return (spark.read.format("json").load(data_path_data_one))

# Second Silver table definition
@dlt.table(
  comment="Wikipedia clickstream data cleaned and prepared for analysis."
) 
def silver_table_two():
  return (spark.read.csv.load(data_path_data_two))


# Joining the two Silver Tables by calling them by the "function" name
@dlt.table(
  comment="Joining Silver Tables"
)
def my_gold_table():
  silver_one = dlt.read("silver_table_one")
  silver_two = dlt.read("silver_table_two")
  return ( 
     silver_one.join(silver_two, silver_one.id == silver_two.id, how="inner")
  )

0
投票

参考@Axel R给出的答案。你也可以将连接写成如下:

@dlt.table(
  comment="Joining Silver Tables"
)
def my_gold_table():
  silver_one = dlt.read("silver_table_one")
  silver_two = dlt.read("silver_table_two")
  return ( 
     silver_one.join(silver_two, ["id"], how="inner")
  )

请参阅此处


0
投票

您可以使用 Spark SQL,例如

@dlt.table(
  comment="Joining Silver Tables"
)
def my_gold_table():
  return ( 
     spark.sql("SELECT * from LIVE.silver_one as a INNER JOIN LIVE.silver_two as b ON b.id = a.id")
  )

您需要为同一管道中定义的表添加实时虚拟模式。

© www.soinside.com 2019 - 2024. All rights reserved.