使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示:
@Bean
public Function<KStream<String, Double>, KStream<String, Double>> sqrt() {
return numbers -> numbers.mapValues(Math::sqrt);
}
@Bean
public Consumer<KStream<String, Double>> log() {
return sqrt -> sqrt.foreach((key, value) -> log.info("{}: {}", key, value));
}
其中
sqrt()
输出数字的平方根,然后用 log()
记录。 application.yaml
因此看起来像这样:
spring:
cloud:
stream:
function:
bindings:
sqrt-in-0: numbers
sqrt-out-0: sqrt-numbers
log-in-0: sqrt-numbers
kafka:
streams:
bindings:
sqrt:
consumer:
application-id: sqrtApplicationId
log:
consumer:
application-id: logApplicationId
启动应用程序时,出现以下错误:
The bean 'sqrt-numbers' could not be registered. A bean with that name has already been defined and overriding is disabled.
Action:
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
当然,现在将
definition-overriding
设置为 true
并不是一个正确的解决方案,并且它会失败并显示 IllegalStateException
。
我该如何解决这个问题?
问题的重现可以在这里找到:https://github.com/cedric-schaller/dltawareprocessor-type-error
假设您有两个名为
numbers
和 sqrt-numbers
的 Kafka 主题,则以下配置应该有效。
spring:
cloud:
stream:
bindings:
sqrt-in-0:
destination: numbers
sqrt-out-0:
destination: sqrt-numbers
log-in-0:
destination: sqrt-numbers
kafka:
streams:
bindings:
sqrt-in-0:
consumer:
application-id: sqrtApplicationId
log-in-0:
consumer:
application-id: logApplicationId
您可以使用
spring.cloud.stream.function.bindings..
覆盖默认绑定名称。例如,如果您想将绑定名称从 sqrt-in-0
更改为 input
,您可以像 spring.cloud.stream.function.bindings.sqrt-in0-0: input
那样进行操作。不过,您仍然需要在覆盖的绑定上设置 destination
(通过 spring.cloud.stream.bindings.input.destination
)。
您遇到的特定异常是因为您试图重用已创建的绑定名称 -
sqrt-numbers
。