我有一些数据位于 Snowflake 中,所以我想使用 delta live 表对它们应用 CDC,但我遇到了一些问题。
这就是我正在尝试做的事情:
@dlt.view()
def table1():
return spark.read.format("snowflake").options(**options).option('query', query).load()
dlt.create_streaming_table(target)
dlt.apply_changes(
source = 'table1'
target = 'target'
....
)
如果我正在读取增量表,则相同的代码运行良好,但如果其雪花出现以下错误
'org.apache.spark.sql.AathesisException:APPLY CHANGES 目标'XXXXX'的源数据必须是流式查询'
您有什么解决方案或解决方法可以帮助我吗?
这里回复较晚,但您收到的错误表明您需要将源视为流源,即您应该使用spark.readStream而不是spark.read。
https://docs.databricks.com/en/delta-live-tables/cdc.html#process-scd-type-2-updates