我正在运行一个 DLT(Delta Live Table)作业,该作业为两个单独的表创建青铜表 > 银表。所以最后,我有两个独立的黄金表,我想将它们合并到一个表中。我知道如何在 SQL 中执行此操作,但每次我在 Databricks 上的 Databricks 笔记本中使用 SQL 单元运行作业时,都会出现此错误。
Magic commands (e.g. %py, %sql and %run) are not supported with the exception of
%pip within a Python notebook. Cells containing magic commands are ignored.
Unsupported magic commands were found in the following notebooks
我会在 PySpark 中执行此操作,但它没有创建表功能。
这是我制作青铜桌子的代码
@dlt.table(name="Bronze_or",
comment = "New online retail sales data incrementally ingested from cloud object storage landing zone",
table_properties={
"quality": "bronze",
"pipelines.cdc.tombstoneGCThresholdInSeconds": "2" #reducing the threshold to 2 instead of 5
}
)
def Bronze_or():
return (
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("/**Path to raw csv data**/")
)
然后我创建一个视图
expect_list = {"not_null_pk": "id IS NOT NULL", "QuantityNotNeg": "Quantity >= 0"}
@dlt.view(name="Bronze_or_clean_v",
comment="Cleansed bronze retail sales view (i.e. what will become Silver)")
# @dlt.expect("EMPLOYEE_ID", "EMPLOYEE_ID IS NOT NULL")
@dlt.expect_all(expect_list)
# @dlt.expect("valid_address", "address IS NOT NULL")
# @dlt.expect_or_drop("valid_operation", "operation IS NOT NULL")
def Bronze_or_clean_v():
return dlt.read_stream("Bronze_or") \
.withColumn('inputFileName',F.input_file_name()) \
.withColumn('LoadDate',F.lit(datetime.now()))
最后,我创建了银色桌子
dlt.create_target_table(name="Silver_or",
comment="Clean, merged retail sales data",
table_properties={
"quality": "silver",
"pipelines.cdc.tombstoneGCThresholdInSeconds": "2"
}
)
最后,我打造了金桌
@dlt.table(name="Gold_or")
def Gold_or():
return (
dlt.read("Silver_or")
# .filter(expr("current_page_title == 'Apache_Spark'"))
# .withColumnRenamed("previous_page_title", "referrer")
.sort(desc("LIVE.Silver_or.CustomerID"))
.select("LIVE.Silver_or.UnitPrice", "LIVE.Silver_or.Quantity", "LIVE.Silver_or.CustomerID")
#.limit(10)
)
我对两个不同的 CSV 文件运行两次,所以最后,我有两个单独的 Gold 表,但我想将它们合并为一个带有选择列的表。
仅供参考:两个表共享一个外键。
当您在 DLT 中定义表时,它们由 DLT 管理,您无法像您尝试的那样在 DLT 中使用正常的 Databricks SQL 删除或创建表。但您可以使用 DLT 框架通过必要的转换来定义您想要的表。
据我从您的问题和代码片段中了解到,您有 2 组所显示的代码,2 个表各 1 组。您可以将其放在 1 个或 2 个笔记本中,但在同一 DLT 管道中定义。 看来您正在尝试在 2 个 Gold 表之间进行连接。
所以只需创建一个新的sql Notebook并使用以下代码
CREATE OR REFRESH LIVE TABLE Gold_data
AS SELECT * FROM LIVE.gold_or LEFT JOIN LIVE.gold_rc ON gold_or.CustomerID=gold_rc.customer_id
将此笔记本附加到您现有的管道。在 DLT 中,您不能在同一个笔记本中拥有数据块和 SQL。但您可以将它们放在同一 DLT 管道内的单独笔记本中。 DLT 将理解这需要在创建其他 2 个黄金表之后运行,并且它将相应地采取行动。
在 DLT 中,使用实时虚拟模式,如果您引用在管道中创建的表,则需要提及。您可以在此处查看 DLT SQL 的完整语法。