如何在Python中产生JSON格式的Kafka消息

问题描述 投票:0回答:1

如何删除引号并以原始格式发送数据原始的JSON格式为:

{
  "@timestamp": "2020-06-02T09:38:03.183186Z"
}

此数据在另一个主题中

"{\"@timestamp\": \"2020-05-25T17:40:47.582778Z\"}"

这是在服务器之间发送数据的代码

def parse(d):   
    if str(type(d)) == "<class 'dict'>":       
        return (r)
    return -1

producer = KafkaProducer(bootstrap_servers=param["BOOTSTRAP_SERVERS"],
                                 value_serializer=lambda x: dumps(x).encode('utf-8'))  # utf-8
consumer = KafkaConsumer(bootstrap_servers=param["BOOTSTRAP_SERVERS"]+'1',
                                 auto_offset_reset=param["AUTO_OFFSET_RESET"],
                                 consumer_timeout_ms=param["CONSUMER_TIMEOUT_MS"],
                                 enable_auto_commit=False,
                                 auto_commit_interval_ms=60000,
                                 group_id=param["GROUP_ID"],
                                 client_id=param["CLIENT_ID"]
                                 )
consumer.subscribe([param["TOPIC_IN"]])
 while True:
      num_rows = 0
      for msg in consumer:
          num_rows = num_rows + 1
          m = json.loads(msg.value)
          j = parse(m)
          if j != -1:
             data = json.dumps(j)
             producer.send(param["TOPIC_OUT"], value=data)
python apache-kafka kafka-producer-api
1个回答
0
投票

您当前正在将值序列化为字符串。如果要使用JSON而不是字符串,则需要正确序列化值。


以下应完成的技巧:

import json  

producer = KafkaProducer(
    bootstrap_servers='mykafka-broker',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
© www.soinside.com 2019 - 2024. All rights reserved.