对 databricks autolader 寄予厚望

问题描述 投票:0回答:1

我已经使用自动加载器 bronze-->silver-->gold 实现了数据管道。

现在,当我这样做时,我想执行一些数据质量检查,为此我正在使用 great expectations library。

但是我在尝试验证数据时遇到以下错误

validator.expect_column_values_to_not_be_null(column="col1") validator.expect_column_values_to_be_in_set( 专栏=“col2”, value_set=[1,6] )

MetricResolutionError:必须使用 writeStream.start() 执行带有流源的查询;

看起来伟大的期望只能使用静态/批处理数据。

有人可以建议我如何让它为流数据工作吗?

我在我的数据块笔记本中按照下面的内容开始了 great_expectations

https://docs.greatexpectations.io/docs/deployment_patterns/how_to_use_great_expectations_in_databricks/

from pyspark.sql.functions import col, to_date, date_format
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
import time
 
# autoloader table and checkpoint paths
basepath = "/mnt/autoloaderdemodl/datagenerator/"
bronzeTable = basepath + "bronze/"
bronzeCheckpoint = basepath + "checkpoint/bronze/"
bronzeSchema = basepath + "schema/bronze/"
silverTable = basepath + "silver/"
silverCheckpoint = basepath + "checkpoint/silver/"
landingZoneLocation = "/mnt/autoloaderdemodl/datageneratorraw/customerdata_csv"
 
# Load data from the CSV file using Auto Loader to bronze table using rescue as schema evolution option
raw_df = spark.readStream.format("cloudFiles") \
            .option("cloudFiles.format", "csv") \
            .option("cloudFiles.schemaEvolutionMode", "rescue") \
            .option("Header", True) \
            .option("cloudFiles.schemaLocation", bronzeSchema) \
            .option("cloudFiles.inferSchema", "true") \
            .option("cloudFiles.inferColumnTypes", True) \
        .load(landingZoneLocation)
 
        # Write raw data to the bronze layer
bronze_df = raw_df.writeStream.format("delta") \
            .trigger(once=True) \
            .queryName("bronzeLoader") \
            .option("checkpointLocation", bronzeCheckpoint) \
            .option("mergeSchema", "true") \
            .outputMode("append") \
            .start(bronzeTable)
# Wait for the bronze stream to finish
bronze_df.awaitTermination()
bronze = spark.read.format("delta").load(bronzeTable)
bronze_count = bronze.count()
display(bronze)
print("Number of rows in bronze table: {}".format(bronze_count))
 
 
bronze_df = spark.readStream.format("delta").load(bronzeTable)
 
# Apply date format transformations to the DataFrame
# Transform the date columns
silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\
                     .withColumn("date2", to_date(col("date2"), "yyyyDDD"))\
                     .withColumn("date3", to_date(col("date3"), "MMddyy"))
 
# Write the transformed DataFrame to the Silver layer
silver_stream  = silver_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", silverCheckpoint) \
    .trigger(once=True) \
    .start(silverTable)
 

# Wait for the write stream to complete
silver_stream.awaitTermination()
# Count the number of rows in the Silver table
silver = spark.read.format("delta").load(silverTable)
display(silver)
silver_count = silver.count()
print("Number of rows in silver table: {}".format(silver_count))

PS - 客户还不想使用 DLT。

包含期望验证的代码

import great_expectations as ge
from great_expectations.datasource.types import BatchKwargs

bronze_df = spark.readStream.format("delta").load(bronzeTable)

# Apply date format transformations to the DataFrame
# Transform the date columns
silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\
                     .withColumn("date2", to_date(col("date2"), "yyyyDDD"))\
                     .withColumn("date3", to_date(col("date3"), "MMddyy"))
def validate_micro_batch(batch_df, epoch):
    print("inside function")
    # Use Great Expectations to validate the batch DataFrame
    clean_df = batch_df
    clean_df.expect_column_values_to_not_be_null(column="col1")
    clean_df.expect_column_values_to_be_between(
        column="col2", min_value=0, max_value=1000
    )
    clean_df.write.format("delta").option("mergeSchema", "true").mode("append").saveAsTable(silverTable)
    # Print the validation results for the batch
    validation_results = clean_df.validate()
    print("Validation results for batch {}:".format(batch_id))
    print(validation_results)
    
# Write the transformed DataFrame to the Silver layer if it passes all expectations
silver_stream = silver_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .foreachBatch(validate_micro_batch) \
    .option("checkpointLocation", silverCheckpoint) \
    .trigger(once=True) \
    .start()
# Wait for the write stream to complete
silver_stream.awaitTermination()
# Count the number of rows in the Silver table
silver = spark.read.format("delta").load(silverTable)
display(silver)
silver_count = silver.count()
print("Number of rows in silver table: {}".format(silver_count))
azure-databricks spark-structured-streaming great-expectations data-quality databricks-autoloader
1个回答
1
投票

Great Expectations 旨在处理批量数据,因此如果您想将它与 Spark 结构化流式处理一起使用,那么您需要在一个函数内实现您的检查,该函数将传递给

foreachBatch
writeStream
参数(医生).

看起来像这样:

def foreach_batch_func(df, epoch):
  # apply GE expectations to df and get clean dataframe
  clean_df = df....
  clean_df.write.format("delta").option("mergeSchema", "true") \
    .mode("append").saveAsTable(silverTable)

silver_stream  = silver_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .foreachBatch(foreach_batch_func) \
    .option("checkpointLocation", silverCheckpoint) \
    .trigger(once=True) \
    .start()

但实际上,对于这种支票,Great Expectations 就有点矫枉过正了。实际上,您需要为此讨论采用 Delta Live Tables。

附言您可能需要为Delta添加幂等写入选项。

© www.soinside.com 2019 - 2024. All rights reserved.