在 Databricks 中的 Delta Live 表作业中在 Pyspark 中创建表

问题描述 投票:0回答:2

我正在运行一个 DLT(Delta Live Table)作业,该作业为两个单独的表创建青铜表 > 银表。所以最后,我有两个独立的黄金表,我想将它们合并到一个表中。我知道如何在 SQL 中执行此操作,但每次我在 Databricks 上的 Databricks 笔记本中使用 SQL 单元运行作业时,都会出现此错误。

Magic commands (e.g. %py, %sql and %run) are not supported with the exception of
%pip within a Python notebook. Cells containing magic commands are ignored.
Unsupported magic commands were found in the following notebooks

我会在 PySpark 中执行此操作,但它没有创建表功能。

这是我制作青铜桌子的代码

@dlt.table(name="Bronze_or",
                  comment = "New online retail sales data incrementally ingested from cloud object storage landing zone",
  table_properties={
    "quality": "bronze",
    "pipelines.cdc.tombstoneGCThresholdInSeconds": "2" #reducing the threshold to 2 instead of 5
  }
)
def Bronze_or():
  return (
    spark.readStream.format("cloudFiles") \
      .option("cloudFiles.format", "csv") \
      .option("cloudFiles.inferColumnTypes", "true") \
      .load("/**Path to raw csv data**/")
  )

然后我创建一个视图

expect_list = {"not_null_pk": "id IS NOT NULL", "QuantityNotNeg": "Quantity >= 0"}

@dlt.view(name="Bronze_or_clean_v",
  comment="Cleansed bronze retail sales view (i.e. what will become Silver)")

# @dlt.expect("EMPLOYEE_ID", "EMPLOYEE_ID IS NOT NULL")
@dlt.expect_all(expect_list) 
# @dlt.expect("valid_address", "address IS NOT NULL")
# @dlt.expect_or_drop("valid_operation", "operation IS NOT NULL")

def Bronze_or_clean_v():
  return dlt.read_stream("Bronze_or") \
  .withColumn('inputFileName',F.input_file_name()) \
  .withColumn('LoadDate',F.lit(datetime.now()))

最后,我创建了银色桌子

dlt.create_target_table(name="Silver_or",
  comment="Clean, merged retail sales data",
  table_properties={
    "quality": "silver",
    "pipelines.cdc.tombstoneGCThresholdInSeconds": "2"
  }
)

最后,我打造了金桌

@dlt.table(name="Gold_or")
def Gold_or():
  return (
    dlt.read("Silver_or")
#       .filter(expr("current_page_title == 'Apache_Spark'"))
#       .withColumnRenamed("previous_page_title", "referrer")
      .sort(desc("LIVE.Silver_or.CustomerID"))
      .select("LIVE.Silver_or.UnitPrice", "LIVE.Silver_or.Quantity", "LIVE.Silver_or.CustomerID")
      #.limit(10)
  )

我对两个不同的 CSV 文件运行两次,所以最后,我有两个单独的 Gold 表,但我想将它们合并为一个带有选择列的表。

仅供参考:两个表共享一个外键。

pyspark databricks delta-lake delta-live-tables data-lakehouse
2个回答
0
投票

您是否考虑过将其写在 SQL Notebook 中?这样你就可以摆脱%sql魔法命令,直接用SQL编写了。

查看截图了解如何切换:


0
投票

当您在 DLT 中定义表时,它们由 DLT 管理,您无法像您尝试的那样在 DLT 中使用正常的 Databricks SQL 删除或创建表。但您可以使用 DLT 框架通过必要的转换来定义您想要的表。

据我从您的问题和代码片段中了解到,您有 2 组所显示的代码,2 个表各 1 组。您可以将其放在 1 个或 2 个笔记本中,但在同一 DLT 管道中定义。 看来您正在尝试在 2 个 Gold 表之间进行连接。

所以只需创建一个新的sql Notebook并使用以下代码

CREATE OR REFRESH LIVE TABLE Gold_data
AS SELECT * FROM LIVE.gold_or LEFT JOIN LIVE.gold_rc ON gold_or.CustomerID=gold_rc.customer_id

将此笔记本附加到您现有的管道。在 DLT 中,您不能在同一个笔记本中拥有数据块和 SQL。但您可以将它们放在同一 DLT 管道内的单独笔记本中。 DLT 将理解这需要在创建其他 2 个黄金表之后运行,并且它将相应地采取行动。

在 DLT 中,使用实时虚拟模式,如果您引用在管道中创建的表,则需要提及。您可以在此处查看 DLT SQL 的完整语法。

© www.soinside.com 2019 - 2024. All rights reserved.