我正在努力实现我的自定义期望。我使用:python、s3、spark、glue。 我描述了我的定制期望:
from great_expectations.expectations.expectation import TableExpectation
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.execution_engine import ExecutionEngine
class ExpectUniqueIdsWithNullEnd(TableExpectation):
library_metadata = {
"maturity": "production",
"package": "custom",
"tags": ["id uniqueness", "business logic"],
"contributors": ["Your Name <[email protected]>"],
}
def _validate(
self,
configuration: ExpectationConfiguration,
data: DataFrame,
runtime_configuration: dict = None,
execution_engine: ExecutionEngine = None,
metrics: dict = None,
):
column = configuration.kwargs.get("column", "id")
ended_at_column = configuration.kwargs.get("ended_at_column", "ended_at")
print("Hello")
print(column)
print(ended_at_column)
filtered_data = data.filter(F.col(ended_at_column).isNull())
id_counts = filtered_data.groupBy(column).count()
non_unique_ids = id_counts.filter(F.col("count") > 1).select(column)
if non_unique_ids.count() > 0:
return {
"success": False,
"unexpected_list": non_unique_ids.collect()
}
else:
return {
"success": True
}
from great_expectations.expectations.registry import register_expectation
register_expectation(ExpectUniqueIdsWithNullEnd)
from great_expectations.core.expectation_configuration import ExpectationConfiguration
expectation_config = ExpectationConfiguration(
expectation_type="expect_unique_ids_with_null_end",
kwargs={
"column": "id",
"ended_at_column": "ended_at"
}
)
接下来,我尝试使用批量请求和验证器来调用它
batch_request = RuntimeBatchRequest(
datasource_name="spark_s3",
data_asset_name="data_asset_name",
batch_identifiers={"default_identifier_name": "default_identifier_name"},
data_connector_name="default_runtime_data_connector_name",
runtime_parameters={"path": "table_path"},
batch_spec_passthrough={"reader_method": "delta", "reader_options": {"header": True}},
)
validator = context_gx.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name
)
# Додавання до suite
expectation_suite_name = "haistruk_test_suite"
suite = context_gx.get_expectation_suite(expectation_suite_name)
suite.add_expectation(expectation_config)
context_gx.save_expectation_suite(suite, expectation_suite_name)
results = validator.validate()
print("Results of the custom expectation:", results)
但我收到以下错误:
metrics_validate 中的第 1100 行 ] = self._validate( 类型错误: ExpectUniqueIdsWithNullEnd._validate() 缺少 1 个必需的位置 参数:“数据” ”
尝试了一切都找不到问题,请帮忙。
我从_validate函数的参数中删除了数据。 现在我使用以下代码获取日期范围
data = execution_engine.dataframe