在Kafka / RabbitMQ中的每个消息确认

问题描述 投票:1回答:4

我们有一个工作的rabbitmq。实现,由于数量,我们计划切换到kafka。

我有一点怀疑。

在RabbitMQ中,当消费者消费来自Q的消息时,消息进入不同的阶段,未被打包的阶段。客户端/消费者需要一些时间来处理消息,在成功处理后,它向Q发送确认并且消息从Q中删除。如果不成功,在定义的时间段之后如果Q没有得到确认,则消息是附在Q的末尾。通过这种方式,我们不会丢失任何消息。

凭借我在Kafka的一点知识,我明白如果例如消息100没有被成功处理,则偏移量不会增加,但如果消息101被成功处理,它将会增加。所以我丢失了消息100。

有没有办法保证不会丢失任何消息。

apache-kafka rabbitmq producer-consumer
4个回答
1
投票

我也遇到了同样的问题。如果我想以简单的方式放置,RabbitMQ会保留每个

  1. 已发布但未消费
  2. 已发布,已消费且未确认的消息。

卡夫卡没有,所以你不能把它做好,你必须自己实施。

虽然有可用的选项,使用kmq,性能将低于50%,看看

https://softwaremill.com/kafka-with-selective-acknowledgments-performance/


1
投票

Kafka不会删除主题中的消息,除非它到达其中一个log.retention.bytes log.retention.hours log.retention.minutes log.retention.ms configs。因此,如果偏移量增加,则不会丢失先前的消息,您只需将偏移量更改为所需的位置即可。


0
投票

除非您轮询新邮件,否则不会增加您的邮件偏移量。所以你必须担心重新处理你的消息。

如果要将数据处理结果存储到Kafka群集,可以使用transaction feature of Kafka。这样您就可以支持一次交付。您的所有更改都将被保存,或者不会存储任何更改。

另一种方法是使您的处理方案具有幂等性。您将为Kafka中的每封邮件分配唯一的ID。处理消息时,将ID存储在数据库中。崩溃后,通过查看数据库检查您的消息ID是否已处理。


0
投票

您应该阅读一下卡夫卡的消息消费方式。这里是官方Kafka文档消费者部分的链接:https://kafka.apache.org/documentation/#theconsumer

基本上,在Kafka中,消息只在经过足够的时间后被删除,并且使用log.retention.hourslog.retention.minuteslog.retention.ms配置,就像@Amin所说的那样。

在Kafka中,任何数量的消费者都可以随时开始使用来自任何主题的消息,无论其他消费者是否已经从同一主题消费。 Kafka使用存储在Kafka本身的偏移量,在每个主题/分区上跟踪每个消费者的位置。因此,如果您的消费者需要消费消息100,就像您在问题中描述的那样,您可以简单地“回放”到所需的消息,并再次开始正常消费。如果您以前使用它,或者其他消费者是否正在阅读该主题,则无关紧要。

来自官方Kafka文档:

消费者可以故意回退到旧的偏移并重新使用数据。这违反了队列的通用合同,但对许多消费者来说,这是一个必不可少的功能。例如,如果消费者代码有错误并且在消费了某些消息后被发现,则消费者可以在修复错误后重新使用这些消息。

© www.soinside.com 2019 - 2024. All rights reserved.