我正在尝试使用 Pandera 库来定义数据集的数据质量检查。 数据集作为极坐标数据框加载,并使用 PanderaPolars 来定义检查。
对于某些列检查,我们需要在多列上设置联合条件。 例如
使用具有 BUY_SELL(str、B 或 S)、Product_TYPE(str、REV 或 XX)、QUANTITY(浮动)的文件
我想定义以下检查:
我尝试使用 Pandas(不是 Polars)按照下面的方式提出 2 个自定义检查,但它们没有按预期工作。在 Polars 中,我无法编写等效的内容,因为似乎没有 lambda 可用
import pandera as pa
import pandas as pd
import warnings
df = pd.read_csv('file.csv')
check_quantity_grouped = pa.Check(
lambda g: g[(True, "B")] > 10,
groupby=lambda df: (
df.assign(product_type_rev=lambda d: d["PRODUCT_TYPE"] == "REV")
.groupby(["product_type_rev", "BUY_SELL"])
)
,ignore_na=True,raise_warning=False, error="trade quantity, when product_type = REV and BUY_SELL = B, is less than 10"
)
check_quantity_filter = pa.Check(
lambda df: df[(df['PRODUCT_TYPE'] == 'REV') & (df['BUY_SELL'] == 'B')] > 10
,ignore_na=True,raise_warning=False
)
schema = pa.DataFrameSchema({
"QUANTITY": pa.Column(float, [check_quantity_filter, check_quantity_grouped], nullable=True)
})
try:
schema(df, lazy=True)
except pa.errors.SchemaErrors as exc:
filtered_df = df[df.index.isin(exc.failure_cases["index"])]
failures = exc.failure_cases
print(f"filtered df:\n{filtered_df}")
print(failures)
我的问题是在 Pandera 中是否可以进行更复杂的检查,例如上面列出的检查(理想情况下是极地,但如果不是 Pandas 也可以)
我以前没有使用过 Pandera,所以这可能可以改进。
根据这些示例:https://pandera.readthedocs.io/en/latest/polars.html#dataframe-level-checks
您定义一个传递给
data
的函数,其中包含一个 .lazyframe
,您可以在其中运行您的 Polars 代码。
df = pl.read_csv(b"""
PRODUCT_TYPE,BUY_SELL,QUANTITY
ABC,D,13
REV,B,11
REV,B,5
REV,B,12
REV,C,9
""".strip())
您想要的逻辑似乎是:
df.with_columns(
pl.when(pl.col.PRODUCT_TYPE == "REV", pl.col.BUY_SELL == "B")
.then(pl.col.QUANTITY > 10)
.otherwise(True)
)
shape: (5, 3)
┌──────────────┬──────────┬──────────┐
│ PRODUCT_TYPE ┆ BUY_SELL ┆ QUANTITY │
│ --- ┆ --- ┆ --- │
│ str ┆ str ┆ bool │
╞══════════════╪══════════╪══════════╡
│ ABC ┆ D ┆ true │
│ REV ┆ B ┆ true │
│ REV ┆ B ┆ false │
│ REV ┆ B ┆ true │
│ REV ┆ C ┆ true │
└──────────────┴──────────┴──────────┘
放入Pandera:
import polars as pl
import pandera.polars as pa
from pandera.polars import PolarsData
df = pl.read_csv(b"""
PRODUCT_TYPE,BUY_SELL,QUANTITY
ABC,D,13
REV,B,11
REV,B,5
REV,B,12
REV,C,9
""".strip())
def check_quantity_filter(data: PolarsData) -> pl.LazyFrame:
return data.lazyframe.select(
pl.when(pl.col.PRODUCT_TYPE == "REV", pl.col.BUY_SELL == "B")
.then(pl.col.QUANTITY > 10)
.otherwise(True)
)
schema = pa.DataFrameSchema({
"QUANTITY": pa.Column(float, checks=[pa.Check(check_quantity_filter)], nullable=True)
})
try:
schema(df, lazy=True)
except pa.errors.SchemaErrors as exc:
failures = exc.failure_cases
print(failures)
它给出:
shape: (2, 6)
┌──────────────┬────────────────┬──────────┬───────────────────────┬──────────────┬───────┐
│ failure_case ┆ schema_context ┆ column ┆ check ┆ check_number ┆ index │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ str ┆ str ┆ i32 ┆ i32 │
╞══════════════╪════════════════╪══════════╪═══════════════════════╪══════════════╪═══════╡
│ Int64 ┆ Column ┆ QUANTITY ┆ dtype('Float64') ┆ null ┆ null │
│ 5 ┆ Column ┆ QUANTITY ┆ check_quantity_filter ┆ 0 ┆ 2 │
└──────────────┴────────────────┴──────────┴───────────────────────┴──────────────┴───────┘