如何使用python在kafka使用者中聚合json数据?

问题描述 投票:0回答:1

我在KAFKA Transactions中产生的数据如下:

ConsumerRecord(topic ='Transactions',partition = 0,offset = 3,timestamp = 1591277946735,timestamp_type = 0,key = None,value = {'transaction_id':'9495601361','account_number':14,'transaction_reference' :'20070','transaction_datetime':'2020-06-04T19:09:06.735129','amount':260.93},headers = [],checksum = None,serialized_key_size = -1,serialized_value_size = 160,serialized_header_size = -1 )

ConsumerRecord(topic ='Transactions',partition = 0,offset = 4,timestamp = 1591277946736,timestamp_type = 0,key = None,value = {'transaction_id':'4952940859','account_number':14,'transaction_reference' :'44291','transaction_datetime':'2020-06-04T19:09:06.736128','amount':2.82},headers = [],checksum = None,serialized_key_size = -1,serialized_value_size = 158,serialized_header_size = -1 )

ConsumerRecord(topic ='Transactions',partition = 0,offset = 5,timestamp = 1591277946737,timestamp_type = 0,key = None,value = {'transaction_id':'0193362270','account_number':12,'transaction_reference' :'96312','transaction_datetime':'2020-06-04T19:09:06.736128','amount':766.95},headers = [],checksum = None,serialized_key_size = -1,serialized_value_size = 160,serialized_header_size = -1 )

到目前为止编写的消费者代码是:

consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['Transactions'])
for message in consumer:
            print (message)

我想要输出类似(account_number,sum(amount))的元组,如何实现此目标?

python apache-kafka kafka-consumer-api kafka-producer-api kafka-python
1个回答
1
投票

我认为字典对数据进行分组可能比元组有用。 defaultdict非常适合此过程

defaultdict
© www.soinside.com 2019 - 2024. All rights reserved.