我使用其 avro 架构对字典进行 JSON 编码,使其成为字符串化 JSON:
import json
from datetime import datetime
import fastavro
message = {
"name": "any",
"ingestion_ts": datetime.utcnow(),
"values": {
"amount": 5,
"countries": ["se", "nl"],
"source": {
"name": "web",
"url": "whatever"
}
}
}
avro_schema = "" # import avro schema
fo = StringIO()
fastavro.json_writer(fo, avro_schema, [message])
message_str = fo.getvalue()
这正如预期的那样工作,并返回一个符合所提供模式的 JSON 编码字符串,如下所示:
'{"name": "any", "ingestion_ts": 1703192665965373, "values": {"amount": 5, "countries": ["se", "nl"], "source": {"name": "web", "url": "whatever"}}}'
但是,现在我需要将此 str 包装在
payload
键上的字典中,并将其发布到需要此模式的消息队列:
{
"type": "record",
"namespace": "CDCEvent",
"name": "CDCEvent",
"fields": [
{
"doc": "The system that generated the event",
"type": "string",
"name": "sys"
},
{
"doc": "The operation performed on the event",
"type": "string",
"name": "op"
},
{
"doc": "The content of the event",
"type": "string",
"name": "payload"
}
]
}
所以我就这样做:
wrap = {
"sys": "my_system",
"op": "c",
"payload": message_str
}
wrap_str = json.dumps(wrap)
问题是,由于
message_str
之前已经编码过,当我通过调用json.dumps
再次对换行进行编码时,有效负载再次被编码,并且其中的双引号被转义,并且消息无法正确消费者解码:
'{"sys": "my_system", "op": "c", "payload": "{\\"name\\": \\"any\\", \\"ingestion_ts\\": 1703192665965373, \\"values\\": {\\"amount\\": 5, \\"countries\\": [\\"se\\", \\"nl\\"], \\"source\\": {\\"name\\": \\"web\\", \\"url\\": \\"whatever\\"}}}"}'
如何避免这种双重编码?我一直在尝试重新设计解决方案,但也许我已经很困惑并且错过了一种非常简单的方法。请记住,队列正在等待
payload
字段 的 str
当你说消费者无法解码消息时,他们是如何解码的?如果他们从您的示例中获取
wrap_str
并应用以下转换,他们将收到原始消息:
wrapped_object = json.loads(wrap_str)
for record in fastavro.json_reader(StringIO(wrapped_object["payload"]), avro_schema):
print(record)