我有一个流“dest”,我想要两个消费者,并且两个流应该使用相同的消息。
但是,如果一个流消耗了该消息,则该消息将丢失并且不会进入第二个流。我不想排队,但这里有话题。
我尝试为 2 个不同的流创建 2 个不同的组,但没有帮助。 这里要如何配置呢?
spring:
cloud:
stream:
bindings:
input:
group: group1
destination: dest
content-type: application/json
spring:
cloud:
stream:
bindings:
input:
group: group2
destination: dest
content-type: application/json
spring:
cloud:
stream:
bindings:
output:
destination: dest
content-type: application/json
您需要为该用例声明两个不同的绑定
input1
和 input2
:
spring:
cloud:
stream:
bindings:
input1:
group: group1
destination: dest
content-type: application/json
spring:
cloud:
stream:
bindings:
input2:
group: group2
destination: dest
content-type: application/json
这样,绑定器将创建两个不同的使用者,因此,它们都将使用 Kinesis 流中的相同记录。
当然,您需要为每个绑定目标有两个不同的
@StreamListener
配置。
该解决方案是否也适用于使用相同消息的不同微服务?