在文档中提到生产者重试基于max_retries
将消息发送到队列。
所以我关闭了Kafka,然后尝试了我的制作人。我收到此错误
Fetching cluster metadata from kafka://localhost:9092
[topic_metadata] Opening connection to localhost:9092 with client id MYCLIENTID
ERROR -- : [topic_metadata] Failed to connect to localhost:9092: Connection refused
DEBUG -- : Closing socket to localhost:9092
ERROR -- : Failed to fetch metadata from kafka://localhost:9092
Completed 500 Internal Server Error in 486ms (ActiveRecord: 33.9ms)
这很有意义,但是retries
之后再也不会发生。我已经从内到外阅读了文档,但无法弄清楚retries
实际如何触发?
这是我的代码:
def self.deliver_message(kafka, message, topic, transactional_id)
producer = kafka.producer(idempotent: true,
transactional_id: transactional_id,
required_acks: :all,
max_retries: 5,
retry_backoff: 5)
producer.produce(message, topic: topic)
producer.deliver_messages
end
链接到文档:
https://www.rubydoc.info/gems/ruby-kafka/Kafka/Producer#initialize-instance_method
谢谢你。
重试是基于生产者回调引发的异常类型。根据Callback Docs,在回调期间可能发生以下异常:
在处理此记录期间引发的异常。如果没有发生错误,则为Null。可能引发的异常包括:
不可恢复的异常(致命,永远不会发送消息):
可恢复的异常(瞬态,可以通过增加#.retries来覆盖:]
完全关闭Kafka看起来像是不可恢复的异常。