从 Kafaka 队列读取时我会丢失事件,因为即使禁用了
enable_auto_commit
,消费者也会在没有显式提交的情况下更新偏移量。
from kafka import KafkaClient, TopicPartition
topic_name = "my_topic"
consumer = KafkaConsumer(topic_name, group_id="me", enable_auto_commit=False)
for i, message in enumate(consumer):
if i == 5:
expected = message.offset
print(expected)
break
else:
consumer.commit()
for message in consumer:
value = message.offset
print(value)
consumer.commit()
break
产生
>>> 50078
>>> 50079
如果我不提交读取,下次开始消费消息时不应该再次读取吗?
看起来您正在使用
kafka-python
,我说得对吗?如果是这样,有一个 GitHub 问题 解释了解决方法,尽管我没有看到维护者提供的解决方案。因此,虽然我不知道您遇到此问题的原因,但基于该问题,我可以建议您至少提交一次消息。