我使用Spring Stream Cloud在Kafka上消费消息。当在kafka上产生消息时,所有消费者都受到打击。
但是kafka的文档说,通过使用组只有一个消费者消费消息。
这是我的消费者代码。
@EnableBinding(Sink.class)
public class Consumer2 {
@StreamListener(target = Sink.INPUT)
public void consume(String message) {
System.out.println("33333");
}
@StreamListener(target = Sink.INPUT)
public void consume1(String message) {
System.out.println("444444");
}
}
}
这是我的配置但是我的两个方法都叫:(
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers:
- localhost:9092
bindings:
input:
binder: kafka
destination: abbas
content-type: text/plain
group: input-group-1
output:
binder: kafka
destination: abbas
group: output-group-1
content-type: text/plain
使用该配置,您只有1个使用者(SINK.INPUT)
,而不是2个使用者(@StreamListener
不是使用者,它是用于处理入站消息的模型)
这就是为什么spring将入站消息路由到具有相同接收器的两个@StreamListener
批注。