目前我正在尝试从一个主题中消费并附加来自一个主题的所有消息,但不幸的是最后的列表没有打印任何内容。
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'random_user_fetching',
bootstrap_servers='localhost:9092',
max_poll_records=100,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my_grp'
)
print(consumer)
data_list=[]
try:
for message in consumer:
data = message.value
data_list.append(data)
consumer.commit()
except KeyboardInterrupt:
pass
finally:
pass
print(f"Processed messages: {len(data_list)}")
print(data_list)
consumer.close()
输出如下
<kafka.consumer.group.KafkaConsumer object at 0x7f1c3379c9d0>
有什么意见吗?
代码看起来不错。您能否在循环中添加一些日志记录或打印语句,如下所示,以查看是否引发任何异常。这可以帮助您识别消息消费过程中是否存在任何问题。
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'random_user_fetching',
bootstrap_servers='localhost:9092',
max_poll_records=100,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my_grp'
)
print(consumer)
data_list = []
try:
for message in consumer:
data = message.value
data_list.append(data)
print(f"Received message: {data}")
consumer.commit()
except KeyboardInterrupt:
pass
finally:
pass
print(f"Processed messages: {len(data_list)}")
print(data_list)
consumer.close()