我看过有关在表中的几列上使用 DLT 数据质量期望的教程和文章。下面是代码:
@dlt.table(
comment="Wikipedia clickstream data cleaned and prepared for analysis."
)
@dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_prepared():
return (
dlt.read("clickstream_raw")
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_title", "click_count", "previous_page_title")
)
在这里,在指定期望时,我们必须手动提及将要实现的列。但我希望它们针对数据框中的所有列运行。
为了实现对数据框中所有列的期望,我使用了一个循环并动态更改了函数名称(充当表名称)。但效率非常低。
for column in columns_list_order_table:
exec(f'''
@dlt.table(comment="null value validations for {column}")
@dlt.expect_or_drop("null values","is_null == false")
def null_validation_orders_for_column_{column}():
df = dlt.read("bronze_orders")
return df.withColumn("is_null", col("{column}").isNull())
''')
@expect_all_or_fail(expectations)
声明一个或多个数据质量约束。
是一个Python字典,其中key是期望描述,value是期望约束。如果一行违反了任何期望,请立即停止执行。expectations
为所有列创建例外,如下所示。
expectations = {}
for i in columns_list:
expectations["valid_" + i] = f"{i} IS NOT NULL"
expectations
输出:
{'valid_state': 'state IS NOT NULL',
'valid_store_id': 'store_id IS NOT NULL',
'valid_product_category': 'product_category IS NOT NULL',
'valid_SKU': 'SKU IS NOT NULL',
'valid_price': 'price IS NOT NULL'}
然后,将其添加到您的代码中,如下所示。
columns_list = ['state', 'store_id', 'product_category', 'SKU', 'price']
expectations = {}
for i in columns_list:
expectations["valid_" + i] = f"{i} IS NOT NULL"
expectations
@dlt.table()
@dlt.expect_all_or_fail(expectations)
def clickstream_raw():
return (spark.read.schema(schema).option("header", "true").format("csv").load("/path/csv/"))
如果任何一列包含 null,则会引发异常违规错误。如果您使用的是 CSV 数据,请将
header
选项设置为 true
,因为如果您不提供它,则会引发错误,因为列名称将包含在记录中。