使用 Delta Live Table (CDC) 增量摄取 Snowflake 数据

问题描述 投票:0回答:1

我有一些数据位于 Snowflake 中,所以我想使用 delta live 表对它们应用 CDC,但我遇到了一些问题。

这就是我正在尝试做的事情:

@dlt.view()
def table1():
   return spark.read.format("snowflake").options(**options).option('query', query).load()

dlt.create_streaming_table(target)
dlt.apply_changes(
source = 'table1'
target = 'target'
....
)

如果我正在读取增量表,则相同的代码运行良好,但如果其雪花出现以下错误

'org.apache.spark.sql.AathesisException:APPLY CHANGES 目标'XXXXX'的源数据必须是流式查询'

您有什么解决方案或解决方法可以帮助我吗?

pyspark snowflake-cloud-data-platform databricks delta-live-tables
1个回答
0
投票

这里回复较晚,但您收到的错误表明您需要将源视为流源,即您应该使用spark.readStream而不是spark.read。

https://docs.databricks.com/en/delta-live-tables/cdc.html#process-scd-type-2-updates

© www.soinside.com 2019 - 2024. All rights reserved.