我们知道,我们可以发送带有kafka生产者的密钥,该密钥在内部进行散列,以查找主题数据中的哪个分区。我有一个生产者,我正在其中以JSON格式发送数据。
kafka-console-producer --broker-list 127.0.0.1:9092 --topic USERPROFILE << EOF
{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", "countrycode":"IN", "rating":4.9 }
{"user_id" : 101, "firstname":"eli","lastname":"eli", "countrycode":"GB", "rating":3.0 }
EOF
现在,我想在发送数据时使用“国家/地区代码”作为我的密钥。在普通定界数据中,我们可以指定2个参数:
--property "parse.key=true"
--property "key.separator=:
但是发送JSON sata时如何做。
如果要实现此功能我必须编写一些函数类的东西,我正在使用Confluent的python API for Kafka,如果可以使用python来说,我将非常感激。
JSON只是一个字符串。控制台生产者不会解析JSON,只有Avro控制台生产者会解析。
我会避免使用key.separator=:
,因为JSON包含:
。您可以改用|
字符,然后直接键入
countrycode|{"your":"data"}
在Python中,为the produce function takes a key, yes。您可以像这样解析您的数据,以便为键提取一个值。
key = 'countrycode'
records = [{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", key:"IN", "rating":4.9 },
{"user_id" : 101, "firstname":"eli","lastname":"eli", key:"GB", "rating":3.0 }
]
import json
for r in records:
producer.produce('topic', key=r[key], value=json.dumps(r))
# first record will send a record containing ('IN', { ... 'countrycode':'IN'})