创建自定义 TableExpectation Great_expectations

问题描述 投票:0回答:1

我正在努力实现我的自定义期望。我使用:python、s3、spark、glue。 我描述了我的定制期望:

from great_expectations.expectations.expectation import TableExpectation
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.execution_engine import ExecutionEngine

class ExpectUniqueIdsWithNullEnd(TableExpectation):

    
    library_metadata = {
        "maturity": "production",
        "package": "custom",
        "tags": ["id uniqueness", "business logic"],
        "contributors": ["Your Name <[email protected]>"],
    }

    def _validate(
        self,
        configuration: ExpectationConfiguration,
        data: DataFrame,
        runtime_configuration: dict = None,
        execution_engine: ExecutionEngine = None,
        metrics: dict = None, 
    ):
        column = configuration.kwargs.get("column", "id")
        ended_at_column = configuration.kwargs.get("ended_at_column", "ended_at")
        print("Hello")
        print(column)
        print(ended_at_column)
        filtered_data = data.filter(F.col(ended_at_column).isNull())
        id_counts = filtered_data.groupBy(column).count()
        non_unique_ids = id_counts.filter(F.col("count") > 1).select(column)
        
        if non_unique_ids.count() > 0:
            return {
                "success": False,
                "unexpected_list": non_unique_ids.collect()
            }
        else:
            return {
                "success": True
            }

from great_expectations.expectations.registry import register_expectation
register_expectation(ExpectUniqueIdsWithNullEnd)

from great_expectations.core.expectation_configuration import ExpectationConfiguration

expectation_config = ExpectationConfiguration(
    expectation_type="expect_unique_ids_with_null_end",
    kwargs={
        "column": "id",
        "ended_at_column": "ended_at"
    }
)

接下来,我尝试使用批量请求和验证器来调用它

batch_request = RuntimeBatchRequest(
    datasource_name="spark_s3",
    data_asset_name="data_asset_name",
    batch_identifiers={"default_identifier_name": "default_identifier_name"},
    data_connector_name="default_runtime_data_connector_name",
    runtime_parameters={"path": "table_path"},
    batch_spec_passthrough={"reader_method": "delta", "reader_options": {"header": True}},
)
validator = context_gx.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name
)
# Додавання до suite
expectation_suite_name = "haistruk_test_suite"
suite = context_gx.get_expectation_suite(expectation_suite_name)
suite.add_expectation(expectation_config)
context_gx.save_expectation_suite(suite, expectation_suite_name)
results = validator.validate()
print("Results of the custom expectation:", results)

但我收到以下错误:

metrics_validate 中的第 1100 行 ] = self._validate( 类型错误: ExpectUniqueIdsWithNullEnd._validate() 缺少 1 个必需的位置 参数:“数据” ”

尝试了一切都找不到问题,请帮忙。

python pyspark aws-glue great-expectations
1个回答
0
投票

我从_validate函数的参数中删除了数据。 现在我使用以下代码获取日期范围

data = execution_engine.dataframe
© www.soinside.com 2019 - 2024. All rights reserved.