从 Kakfa 主题消费并附加到列表中

问题描述 投票:0回答:1

目前我正在尝试从一个主题中消费并附加来自一个主题的所有消息,但不幸的是最后的列表没有打印任何内容。

import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'random_user_fetching',
    bootstrap_servers='localhost:9092',
    max_poll_records=100,
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my_grp' 
)
print(consumer)
data_list=[]
try:
    for message in consumer:
        data = message.value
        data_list.append(data)
        consumer.commit()
        
except KeyboardInterrupt:
    pass
finally:
    pass

print(f"Processed messages: {len(data_list)}")
print(data_list)
consumer.close()

输出如下

<kafka.consumer.group.KafkaConsumer object at 0x7f1c3379c9d0>

有什么意见吗?

python-3.x apache-kafka kafka-consumer-api
1个回答
0
投票

代码看起来不错。您能否在循环中添加一些日志记录或打印语句,如下所示,以查看是否引发任何异常。这可以帮助您识别消息消费过程中是否存在任何问题。

import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'random_user_fetching',
    bootstrap_servers='localhost:9092',
    max_poll_records=100,
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my_grp' 
)
print(consumer)
data_list = []

try:
    for message in consumer:
        data = message.value
        data_list.append(data)
        print(f"Received message: {data}")
        consumer.commit()
        
except KeyboardInterrupt:
    pass
finally:
    pass

print(f"Processed messages: {len(data_list)}")
print(data_list)
consumer.close()
© www.soinside.com 2019 - 2024. All rights reserved.