避免Kafka生产者信息的重复

问题描述 投票:0回答:1

我使用的是 KafkaTemplate 从Spring Boot.Java 8

我的主要目的是,消费者不应该消费两次消息。

1) 调用一个表获取100行并发送给kafka。

2) 假设我处理了70行(我得到了成功的ACK),然后Kafka宕机了(Kafka在RETRY机制时间内无法恢复)

所以,当我重启spring boot应用时,如何确保这70条消息不会再次发送。

一种方法是,我可以在DB表的消息中设置一个标志。is_sent = Y or N.

有没有其他有效的方法?

apache-kafka spring-kafka kafka-producer-api
1个回答
0
投票

对于Kafka,我看到的实现是存储id的指针来跟踪你在主题中的位置,并使用某种分布式存储来在集群级别上跟踪这些。我在那里没有做太多的工作,所以我将尝试提供一个我们用SQS来检测dup的解决方案。很可能Kafka有一个更好的解决方案,这个方案可以解决重复的问题,只是想在那里补充一下,这样你也可以看看备选的解决方案。

我在使用AWS SQS进行点对点消息传递用例时,也遇到了同样的问题,因为它提供了至少一次的交付保证与一次且只有一次。

我们最终使用Redis及其分布式锁定策略来解决这个问题。我这里有一篇文章 https:/angularthinking.blogspot.com.

高层次的方法是创建一个分布式的锁,把一个条目放在缓存中,并为你的用例提供适当的TTL。我们使用LUA脚本来做一个putIfNotExists()的方法,如上面的博客所示。规模是我们关注的问题之一,通过上面的实现,我们能够每秒处理上万条消息,在SQS中没有任何问题,redis的规模也非常好。我们必须根据吞吐量和缓存增长将TTL调整到一个最佳值。我们确实有好处,复制窗口是24小时或更少,所以取决于redis的这个决定是确定的。如果你有更长的窗口,复制可能发生在几天或几个月,redis选项可能不适合。

我们也看了DynamoDB来实现putIfNotExists(),但是redis对于这个用例似乎更有性能,特别是它的原生putIfNotExists使用LUA脚本实现。

祝你搜索成功。


0
投票

我会使用 JDBC源连接器 (取决于你当前使用的数据库)与 Kafka连接 能正确处理这种情况。


万一你还想写自己的制作人。本节 的Kafka FAQ应该很有用。

如何从Kafka中获得精确的一次信息传递?

exactly-once语义有两部分:避免数据生产过程中的重复和避免数据消费过程中的重复。

有两种方法可以在数据生产过程中获得精确的一次语义。

  1. 每一个分区使用一个单一的写入器 每当你收到一个网络错误时 检查该分区的最后一条信息 看看你的最后一次写入是否成功?
  2. 在消息中包含一个主键(UUID什么的),并在消费者上进行重复复制。

如果你做了其中的一件事,Kafka托管的日志将是无重复的。不过,读取无重复也要靠消费者的一些配合。如果消费者定期检查点位置,那么如果它失败了,重新启动,它将从检查点的位置重新启动。因此如果数据输出和检查点不是原子式写入的,那么这里也有可能出现重复的情况。这个问题是你的存储系统所特有的。例如,如果你使用的是数据库,你可以在一个事务中把这些一起提交。LinkedIn写的HDFS加载器Camus就为Hadoop加载做了这样的事情。另一种不需要事务的替代方法是将偏移量与加载的数据一起存储,并使用topicpartitionoffset组合进行重复数据删除。

我认为有两个改进可以使这一点变得更加简单。

  1. 生产者同位素可以自动完成,而且成本更低,可以在服务器上选择性地集成支持。
  2. 现有的高级消费者并没有暴露出很多更精细的偏移控制(例如重置你的位置)。我们将尽快解决这个问题。
© www.soinside.com 2019 - 2024. All rights reserved.