如何使用弹簧云流和kafka活页夹确定一些消息的优先级

问题描述 投票:0回答:1

TLDR:我希望能够在同一主题中的其他人之前处理一些紧急消息。

更多解释:

我正在研究一个相当大的应用程序,它使用许多依赖spring-cloud-stream的弹簧微服务和kafka-binder来完成某些任务。

计划任务可能会产生大量事件。当消耗这些事件时,可能产生并发送中间事件,并且可能花费相当长的时间直到消耗所有初始和中间消息并且完成所有工作。

在此期间,如果有人想要手动发送相同类型的事件,则只有在完成其他所有操作后才会对其进行排队和处理。

在所有其他事件发生之前,我被要求查看是否无法优先处理这些“手动事件”。

通常在这种情况下,你会看到人们主张复制所有主题(例如topic-normaltopic-urgent),复制@StreamListener。这似乎是一个相当复杂的选项,因为我们在这里涉及许多不同的主题。我们还有一些kafka-streams聚合,需要重写才能将其考虑在内。

与一位开发人员一起,我们想到的可能是每个主题的消费者群体(consumerGroupNormalconsumerGroupUrgentusing)都需要重复紧急消息。然后我会为每条消息添加一个优先级标头,复制StreamListener并仅在其中一个监听器中使用带有priority: urgent标头的消息,并且只在另一个监听器中使用带有priority:normal的消息。

我也可能会创建一个ChannelInterceptorAdapter来自动添加和读取这个“优先级”标题(a bit like Sleuth does it)。

但我仍然需要为绑定器添加大量配置才能编写,并且需要复制监听器,添加检查标头并执行或不执行监听器代码的小代码。

总而言之,对于看起来像是一个非常常见且简单的用例而言,这看起来像是一些非常有用的工作,这对其他人有用,所以我想知道是否有更简单的方法来做到这一点?

spring-cloud-stream spring-kafka
1个回答
1
投票

在每个主题上使用两个使用者组几乎是唯一的方法,并且有两个监听器。

但是,您不需要自定义通道拦截器;你可以使用像condition这样的"headers['foo'].equals('urgent')"属性。

© www.soinside.com 2019 - 2024. All rights reserved.