转储字典,其中字段包含已编码的 JSON 字符串

问题描述 投票:0回答:1

我使用其 avro 架构对字典进行 JSON 编码,使其成为字符串化 JSON:

import json
from datetime import datetime

import fastavro

message = {
    "name": "any",
    "ingestion_ts": datetime.utcnow(),
    "values": {
        "amount": 5,
        "countries": ["se", "nl"],
        "source": {
            "name": "web",
            "url": "whatever"
        }
    }
}
avro_schema = "" # import avro schema
fo = StringIO()
fastavro.json_writer(fo, avro_schema, [message])
message_str = fo.getvalue()

这正如预期的那样工作,并返回一个符合所提供模式的 JSON 编码字符串,如下所示:

'{"name": "any", "ingestion_ts": 1703192665965373, "values": {"amount": 5, "countries": ["se", "nl"], "source": {"name": "web", "url": "whatever"}}}'

但是,现在我需要将此 str 包装在

payload
键上的字典中,并将其发布到需要此模式的消息队列:

{
  "type": "record",
  "namespace": "CDCEvent",
  "name": "CDCEvent",
  "fields": [
    {
      "doc": "The system that generated the event",
      "type": "string",
      "name": "sys"
    },
    {
      "doc": "The operation performed on the event",
      "type": "string",
      "name": "op"
    },
    {
      "doc": "The content of the event",
      "type": "string",
      "name": "payload"
    }
  ]
}

所以我就这样做:

wrap = {
    "sys": "my_system",
    "op": "c",
    "payload": message_str
}
wrap_str = json.dumps(wrap)

问题是,由于

message_str
之前已经编码过,当我通过调用
json.dumps
再次对换行进行编码时,有效负载再次被编码,并且其中的双引号被转义,并且消息无法正确消费者解码:

'{"sys": "my_system", "op": "c", "payload": "{\\"name\\": \\"any\\", \\"ingestion_ts\\": 1703192665965373, \\"values\\": {\\"amount\\": 5, \\"countries\\": [\\"se\\", \\"nl\\"], \\"source\\": {\\"name\\": \\"web\\", \\"url\\": \\"whatever\\"}}}"}'

如何避免这种双重编码?我一直在尝试重新设计解决方案,但也许我已经很困惑并且错过了一种非常简单的方法。请记住,队列正在等待

payload
字段

的 str
python json avro
1个回答
0
投票

当你说消费者无法解码消息时,他们是如何解码的?如果他们从您的示例中获取

wrap_str
并应用以下转换,他们将收到原始消息:

wrapped_object = json.loads(wrap_str)

for record in fastavro.json_reader(StringIO(wrapped_object["payload"]), avro_schema):
    print(record)

© www.soinside.com 2019 - 2024. All rights reserved.