Kafka 消费者在没有交易的情况下重新处理

问题描述 投票:0回答:1

如果有一个从分区读取消息的消费者应用程序进行一些数据转换并将其存储在数据库等外部数据存储中或调用休息服务,那么在出现以下情况时处理消息重新处理的可能模式或解决方案是什么消费者失败还是重新平衡?它只处理一次,但当我处理外部服务时,启用事务对我没有帮助。

apache-kafka kafka-consumer-api
1个回答
0
投票

实际上,这个问题并不是 Kafka 特有的,因为它是幂等处理消息的普遍问题。唯一与 Kafka 相关的是在正确的时间提交偏移量。

那么可能的方法是什么?

首先,如果将消息的数据写入数据库,通常非常简单。如果您要将消息写入键值存储,那么通常是通过内置的“upsert”操作来完成的,该操作已经是幂等的,并且如果再次应用相同的数据,则不会产生任何影响。如果您通过 SQL 将数据插入经典 RDBMS,请考虑数据中的唯一字段并在数据库中添加唯一约束。如果还不存在,最坏的情况下可能是消息分区+偏移量。如果消息重复,您可以忽略相关的 SQL 错误或使用

INSERT ... ON CONFLICT (unique_field) DO NOTHING
子句。

如果使用休息服务,您必须以某种方式使其方法具有幂等性。有时意外调用某个方法两次已经可以了。否则,您必须添加一个“幂等键”来休息调用并在服务实现中处理它们。处理意味着检查具有该幂等性密钥的消息是否已经被处理。您可以将分区+偏移量的串联作为幂等键,

© www.soinside.com 2019 - 2024. All rights reserved.