所以,我们有一个生产者可以正常运行几个月没有错误,但突然开始出错
'BufferError: Local: Queue full'
我最初遇到过这个问题,然后通过阅读文档和 S.O 帖子,了解到我们需要在生产后立即调用
poll
,在许多地方也有很好的解释here.
for clientvaluescore in clientvaluescores:
kafka_producer.produce(topic=kafkaconfig['topic_name'],value=clientvaluescore,on_delivery=kafka_delivery_report)
kafka_producer.poll(0)
kafka_producer.flush()
因此,我添加了轮询,这使错误消失了,但 3 个月后,我再次看到同样的错误。这一次,我遇到了this,所以我也添加了异常处理,以及
lingering.ms
。然而,这一次,虽然我没有再收到 BufferError,但我没有看到消息发布到主题。这表明发布一直在默默地失败。
for churnscore in churnscores:
while True:
try:
kafka_producer.produce(topic=kafkaconfig['topic_name'],value=churnscore,on_delivery=kafka_delivery_report)
kafka_producer.poll(0)
break
except BufferError:
kafka_producer.poll(1)
kafka_producer.flush()
我还应该做什么?
你能解决问题吗?