如何更改Kafka拓扑的消费者偏移量?

问题描述 投票:0回答:1

如今,我正在开发一个使用Kafka Streams处理消息的项目。我们的消息由两个标识符组成,其中一个用于用户的标识符,第二个用于用户所属的消息列表的标识符。

该应用程序的故事是客户想要通过我的应用程序将推送消息发送到他们的用户列表。因此,客户端在向用户发送推送消息之前以及之后开始发送之前,从外部源加载用户列表。但客户也希望随时停止正在进行的发送。

我的问题出现在这一点上。我想停止发送推送消息而不通过其流api消耗来自kafka的所有消息。我用谷歌搜索流的源的偏移量作为最新,但找不到合适的方法来完成它。

我怎么能完成它?谢谢。

java apache-kafka apache-kafka-streams
1个回答
1
投票

Kafka Streams不支持此功能。如果你真的需要这样做,你可以停止Kafka Streams并使用bin/kafka-consumer-groups.sh手动搜索到Kafka Streams应用程序(注意:application.id == group.id)。之后您可以重新启动Kafka Streams。

顺便说一句:在即将发布的1.1版本中,bin/kafka-streams-application-reset.sh也将允许操纵补偿。我希望我们还设法制作内部使用的类StreamsResetter公共API - 这将允许您在我们的应用程序中以编程方式执行此操作,而无需回退到命令行。

© www.soinside.com 2019 - 2024. All rights reserved.