JSON列为kafka生产者中的键

问题描述 投票:0回答:1

我们知道,我们可以发送带有kafka生产者的密钥,该密钥在内部进行散列,以查找主题数据中的哪个分区。我有一个生产者,我正在其中以JSON格式发送数据。

kafka-console-producer --broker-list 127.0.0.1:9092 --topic USERPROFILE << EOF 
{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", "countrycode":"IN", "rating":4.9 }
{"user_id" : 101, "firstname":"eli","lastname":"eli", "countrycode":"GB", "rating":3.0 }
EOF

现在,我想在发送数据时使用“国家/地区代码”作为我的密钥。在普通定界数据中,我们可以指定2个参数:

--property "parse.key=true" 
--property "key.separator=:

但是发送JSON sata时如何做。

如果要实现此功能我必须编写一些函数类的东西,我正在使用Confluent的python API for Kafka,如果可以使用python来说,我将非常感激。

python apache-kafka kafka-producer-api confluent-kafka
1个回答
0
投票

JSON只是一个字符串。控制台生产者不会解析JSON,只有Avro控制台生产者会解析。

我会避免使用key.separator=:,因为JSON包含:。您可以改用|字符,然后直接键入

countrycode|{"your":"data"}

在Python中,为the produce function takes a key, yes。您可以像这样解析您的数据,以便为键提取一个值。

key = 'countrycode'
records = [{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", key:"IN", "rating":4.9 },
           {"user_id" : 101, "firstname":"eli","lastname":"eli", key:"GB", "rating":3.0 }
]

import json
for r in records:
    producer.produce('topic', key=r[key], value=json.dumps(r))
    # first record will send a record containing ('IN', {  ... 'countrycode':'IN'})
© www.soinside.com 2019 - 2024. All rights reserved.