如何检查Ruby-Kafka重试是否有效?

问题描述 投票:0回答:1

在文档中提到生产者重试基于max_retries将消息发送到队列。

所以我关闭了Kafka,然后尝试了我的制作人。我收到此错误

Fetching cluster metadata from kafka://localhost:9092
[topic_metadata] Opening connection to localhost:9092 with client id MYCLIENTID
ERROR -- : [topic_metadata] Failed to connect to localhost:9092: Connection refused
DEBUG -- : Closing socket to localhost:9092
ERROR -- : Failed to fetch metadata from kafka://localhost:9092
Completed 500 Internal Server Error in 486ms (ActiveRecord: 33.9ms)

这很有意义,但是retries之后再也不会发生。我已经从内到外阅读了文档,但无法弄清楚retries实际如何触发?

这是我的代码:

 def self.deliver_message(kafka, message, topic, transactional_id)
      producer = kafka.producer(idempotent: true,
                                transactional_id: transactional_id,
                                required_acks: :all,
                                max_retries: 5,
                                retry_backoff: 5)
      producer.produce(message, topic: topic)
      producer.deliver_messages
    end

链接到文档:

https://www.rubydoc.info/gems/ruby-kafka/Kafka/Producer#initialize-instance_method

谢谢你。

ruby-on-rails ruby apache-kafka kafka-producer-api ruby-kafka
1个回答
1
投票

重试是基于生产者回调引发的异常类型。根据Callback Docs,在回调期间可能发生以下异常:

在处理此记录期间引发的异常。如果没有发生错误,则为Null。可能引发的异常包括:

不可恢复的异常(致命,永远不会发送消息):

  • InvalidTopicException
  • OffsetMetadataTooLargeException
  • RecordBatchTooLargeException
  • RecordTooLargeException
  • UnknownServerException

可恢复的异常(瞬态,可以通过增加#.retries来覆盖:]

  • CorruptRecordException
  • InchvalidMetadataException
  • NotEnoughReplicasAfterAppendException
  • NotEnoughReplicasException
  • OffsetOutOfRangeException
  • TimeoutException
  • UnknownTopicOrPartitionException

完全关闭Kafka看起来像是不可恢复的异常。

© www.soinside.com 2019 - 2024. All rights reserved.