我已经使用自动加载器 bronze-->silver-->gold 实现了数据管道。
现在,当我这样做时,我想执行一些数据质量检查,为此我正在使用 great expectations library。
但是我在尝试验证数据时遇到以下错误
validator.expect_column_values_to_not_be_null(column="col1") validator.expect_column_values_to_be_in_set( 专栏=“col2”, value_set=[1,6] )
MetricResolutionError:必须使用 writeStream.start() 执行带有流源的查询;
看起来伟大的期望只能使用静态/批处理数据。
有人可以建议我如何让它为流数据工作吗?
我在我的数据块笔记本中按照下面的内容开始了 great_expectations
from pyspark.sql.functions import col, to_date, date_format
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
import time
# autoloader table and checkpoint paths
basepath = "/mnt/autoloaderdemodl/datagenerator/"
bronzeTable = basepath + "bronze/"
bronzeCheckpoint = basepath + "checkpoint/bronze/"
bronzeSchema = basepath + "schema/bronze/"
silverTable = basepath + "silver/"
silverCheckpoint = basepath + "checkpoint/silver/"
landingZoneLocation = "/mnt/autoloaderdemodl/datageneratorraw/customerdata_csv"
# Load data from the CSV file using Auto Loader to bronze table using rescue as schema evolution option
raw_df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.option("Header", True) \
.option("cloudFiles.schemaLocation", bronzeSchema) \
.option("cloudFiles.inferSchema", "true") \
.option("cloudFiles.inferColumnTypes", True) \
.load(landingZoneLocation)
# Write raw data to the bronze layer
bronze_df = raw_df.writeStream.format("delta") \
.trigger(once=True) \
.queryName("bronzeLoader") \
.option("checkpointLocation", bronzeCheckpoint) \
.option("mergeSchema", "true") \
.outputMode("append") \
.start(bronzeTable)
# Wait for the bronze stream to finish
bronze_df.awaitTermination()
bronze = spark.read.format("delta").load(bronzeTable)
bronze_count = bronze.count()
display(bronze)
print("Number of rows in bronze table: {}".format(bronze_count))
bronze_df = spark.readStream.format("delta").load(bronzeTable)
# Apply date format transformations to the DataFrame
# Transform the date columns
silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\
.withColumn("date2", to_date(col("date2"), "yyyyDDD"))\
.withColumn("date3", to_date(col("date3"), "MMddyy"))
# Write the transformed DataFrame to the Silver layer
silver_stream = silver_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("mergeSchema", "true") \
.option("checkpointLocation", silverCheckpoint) \
.trigger(once=True) \
.start(silverTable)
# Wait for the write stream to complete
silver_stream.awaitTermination()
# Count the number of rows in the Silver table
silver = spark.read.format("delta").load(silverTable)
display(silver)
silver_count = silver.count()
print("Number of rows in silver table: {}".format(silver_count))
PS - 客户还不想使用 DLT。
包含期望验证的代码
import great_expectations as ge
from great_expectations.datasource.types import BatchKwargs
bronze_df = spark.readStream.format("delta").load(bronzeTable)
# Apply date format transformations to the DataFrame
# Transform the date columns
silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\
.withColumn("date2", to_date(col("date2"), "yyyyDDD"))\
.withColumn("date3", to_date(col("date3"), "MMddyy"))
def validate_micro_batch(batch_df, epoch):
print("inside function")
# Use Great Expectations to validate the batch DataFrame
clean_df = batch_df
clean_df.expect_column_values_to_not_be_null(column="col1")
clean_df.expect_column_values_to_be_between(
column="col2", min_value=0, max_value=1000
)
clean_df.write.format("delta").option("mergeSchema", "true").mode("append").saveAsTable(silverTable)
# Print the validation results for the batch
validation_results = clean_df.validate()
print("Validation results for batch {}:".format(batch_id))
print(validation_results)
# Write the transformed DataFrame to the Silver layer if it passes all expectations
silver_stream = silver_df.writeStream \
.format("delta") \
.outputMode("append") \
.foreachBatch(validate_micro_batch) \
.option("checkpointLocation", silverCheckpoint) \
.trigger(once=True) \
.start()
# Wait for the write stream to complete
silver_stream.awaitTermination()
# Count the number of rows in the Silver table
silver = spark.read.format("delta").load(silverTable)
display(silver)
silver_count = silver.count()
print("Number of rows in silver table: {}".format(silver_count))
Great Expectations 旨在处理批量数据,因此如果您想将它与 Spark 结构化流式处理一起使用,那么您需要在一个函数内实现您的检查,该函数将传递给
foreachBatch
的 writeStream
参数(医生).
看起来像这样:
def foreach_batch_func(df, epoch):
# apply GE expectations to df and get clean dataframe
clean_df = df....
clean_df.write.format("delta").option("mergeSchema", "true") \
.mode("append").saveAsTable(silverTable)
silver_stream = silver_df.writeStream \
.format("delta") \
.outputMode("append") \
.foreachBatch(foreach_batch_func) \
.option("checkpointLocation", silverCheckpoint) \
.trigger(once=True) \
.start()
但实际上,对于这种支票,Great Expectations 就有点矫枉过正了。实际上,您需要为此讨论采用 Delta Live Tables。
附言您可能需要为Delta添加幂等写入选项。