考虑以下名为
TableConfigs
的类的简单示例:
import pydantic
from enum import Enum
class TimeUnit(str, Enum):
days = "days"
hours = "hours"
minutes = "minutes"
seconds = "seconds"
class TableNames(str, Enum):
surname = "surname"
weather = "weather"
traffic = "traffic"
class TimeQuantity(pydantic.BaseModel):
value: int
unit: TimeUnit
class TableConfig(pydantic.BaseModel):
on: list[str]
time_quantities: list[TimeQuantity]
class TableConfigs(pydantic.BaseModel):
values: dict[TableNames, TableConfig]
当我使用 pydantic 时,我可以轻松地反序列化/序列化该类的实例:
tc = TableConfigs(
values={
"weather": TableConfig(
on=["city"],
time_quantities=[
TimeQuantity(value=60, unit=TimeUnit.days),
TimeQuantity(value=7, unit=TimeUnit.days),
TimeQuantity(value=1, unit=TimeUnit.seconds),
],
)
}
)
tc_copy = TableConfigs(**tc.model_dump())
作为 model_dump 的结果,json 表示为我所说的压缩留下了很大的空间
{'values':
{
<TableNames.weather: 'weather'>:
{
'on': ['city'],
'time_quantities': [
{'value': 60, 'unit': <TimeUnit.days: 'days'>
},
{'value': 7, 'unit': <TimeUnit.days: 'days'>
},
{'value': 1, 'unit': <TimeUnit.seconds: 'seconds'>
}
]
}
}
}
可以在不丢失信息的情况下写成(编码):
{'tables': 'weather:city;60d,7d,1s'}
这可以通过添加
field_serializer
和 model_serializer
来实现:
class TableConfigs(pydantic.BaseModel):
values: dict[TableNames, TableConfig]
@pydantic.field_serializer("values")
def serialize_values(self, values: dict[TableNames, TableConfig]) -> str:
s = ""
for i, table_name in enumerate(values):
if i > 0:
s += "#"
s += table_name + ":"
s += ",".join(values[table_name].on) + ";"
for time_quantity in values[table_name].time_quantities:
s += str(time_quantity.value) + time_quantity.unit + ","
s = s[:-1]
s = (
s.replace("days", "d")
.replace("hours", "h")
.replace("minutes", "m")
.replace("seconds", "s")
)
return s
@pydantic.model_serializer
def ser_model(self) -> dict[str, Any]:
return {"tables": f"{self.serialize_values(self.values)}"}
tc = TableConfigs(
values={
"weather": TableConfig(
on=["city"],
time_quantities=[
TimeQuantity(value=60, unit=TimeUnit.days),
TimeQuantity(value=7, unit=TimeUnit.days),
TimeQuantity(value=1, unit=TimeUnit.seconds),
],
)
}
)
print(tc.model_dump())
打印
{'tables': 'weather:city;60d,7d,1s}
但是我现在怎样才能再次反序列化呢?我想我必须编写一个函数,该函数采用压缩形式并将其转换为字典,然后将其传递给用于反序列化的常规 pydantic 函数。
正如 @zvone 在评论中所说,如果您想占用更少的带宽或磁盘空间,只需在最终的 JSON 文件上应用可从 Python 本身轻松获得的知识压缩算法即可。这些压缩接近最大理论水平,并且经过数十年的强化以涵盖所有边缘情况以及安全方面:
import zlib
...
compressed_bytes = zlib.compress(json_string.encode("utf-8"))
# And to decompress the binary data:
json_data = zlib.decompress(compressed_bytes).decode("utf-8")
另一方面,如果您想要一个更紧凑的文件,可以在同一屏幕实时状态下打包更多信息,然后它变得更适合人工编辑,那么您想要实现的目标可能会有意义。
另一方面,创建这样的“临时”额外编码意味着您将失去对 JSON 数据的所有回显系统的支持 - 您基本上是在 json 之上使用另一种编码来处理类中的数据。
因此,与 Python stdlib 的数据模型不同,Pydantic 不会为您的模型生成
__init__
函数:然后您可以编写一个 __init__
它将您的自定义字符串解析为 Pydanditic 所需的标准 JSON 格式,并使用以下命令调用 super().__init__()
这个“预反序列化”的数据。
您需要为
__init__
类编写一个 TableConfigs
方法,该方法验证 values
字段获得的值是字典还是字符串。如果它是一个字符串,请调用另一个方法来“解压缩”它,使用与压缩类似的逻辑(抱歉,我不会为您编写该代码,但我想您可能可以做到。不要忘记为模型的编码和解码编写单元测试)。
...
class TableConfigs(pydantic.BaseModel):
values: dict[TableNames, TableConfig]
def __init__(self, **kwargs):
if "values" in kwargs and isinstance(kwargs["values"], str):
kwargs["values"] = self.deserialize_values(kwargs["values"])
super().__init__(**kwargs)
def deserialize_values(self, values):
tb_name, fields = values.split(":")
fields = fields.split(";")
# follow code for reconstructiong the json for each table, as encoded:
new_values = {}
if tb_name == "weather":
new_values["on"] = fields[0]
time_qtys = fields[1].split(",")
new_time_qtys = []
for time_qty in time_qtys:
new_time_qtys.append(...) # <- your decoding code goes here
new_values["time_quantities"] = new_time_qtys
...
elif tbname == "whatever":
...
return new_values
@pydantic.field_serializer("values")
def serialize_values(self, values: dict[TableNames, TableConfig]) -> str:
s = ""
...
...