我正在创建增量实时表,我想添加基于列的元数据注释。
下面是我的代码:
@dlt.table(
comment = "Flattened table for Student data",
name = 'Flattened_table'
)
def flatten():
df=spark.readStream.format("delta").load("url")
column_descriptions_dict={"colname1":"comment for colname 1", "colname2":"comment for
colname 2"}
for field in df.schema.fields:
df = df.withColumn(field.name, col(field.name).alias(field.name, metadata={"comment":
column_descriptions_dict[field.name]}))
return df
但是一旦我检查 dlt 表,我就看不到我的专栏的任何评论(元数据)。
dlt 表没有考虑 pyspark 元数据吗?
您需要在
@dlt.table
定义中添加架构,如下所示:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "dbfs:/dlt_temp/tables/clickstream_raw")
fields = deltaTable.toDF().schema.fields
column_descriptions_dict = {
"state": "state of store",
"store_id": "store id",
"product_category": "product cat",
"SKU": "SKU",
"price": "total price"
}
new_sc = []
for field in fields:
sc = StructField(field.name, field.dataType, True, {'comment': column_descriptions_dict[field.name]})
new_sc.append(sc)
new_schema = StructType(new_sc)
@dlt.table(
comment="Flattened table for Student data",
name='flatten_student',
schema=new_schema
)
def flatten():
df = spark.readStream.format("delta").load("dbfs:/dlt_temp/tables/clickstream_raw")
return df
然后你就会正确地得到评论。
输出: