如何制作可重启的制作人?

问题描述 投票:0回答:1

最新版本的kafka支持完全一次语义(EoS)。为了支持这一概念,每条消息都会添加额外的详细信息。这意味着你的消费者;如果您打印消息的偏移,它们不一定是顺序的。这使得更难以轮询主题以读取最后提交的消息。

在我的情况下,消费者印刷了这样的东西

Offset-0 0
Offset-2 1
Offset-4 2

问题:为了写重启能力的引导者;我轮询主题并阅读上一条消息的内容。在这种情况下;最后一条消息将偏移#5,这不是有效的消费者记录。因此,我在代码中看到了错误。

我可以使用Getting the last message sent to a kafka topic提供的解决方案。唯一的问题是,而不是使用consumer.seek(partition,last_offset = 1);我会使用consumer.seek(partition,last_offset-2)。这可以立即解决我的问题,但它不是一个理想的解决方案。

对于用Java编写的消费者来说,最可靠和最好的解决方案是什么?要么

是否可以使用local state-store作为分区?要么

存储最后一条消息以抵御生产者故障的最佳推荐方法是什么?要么

kafka连接器是否可重启?是否有任何特定的API可用于使生产者重新启动?

仅供参考 - 我不是在寻找快速解决方案

apache-kafka kafka-consumer-api
1个回答
0
投票

就我而言,多个生产者将数据推送到一个大的主题。因此,阅读整个主题将是噩梦。

我找到的解决方案是维护另一个主题,即生产者可以存储元数据的“P1_Track”。在一个事务中,生产者将数据发送到一个大主题和P1_Track。

当我重新启动生产者时,它将读取P1_Track并找出从哪里开始。

考虑将最后提交的消息存储在数据库中并在生产者进程重新启动时使用它。

© www.soinside.com 2019 - 2024. All rights reserved.